logo


Audience: Diverse Background

Time: 2-day workshop (12 hours)

Pre-Requisites: Exposure to Python Programming

Brief Description: Spark is a Distributed Computing Framework used for processing, querying and analysing Big Data. It is open source and it can run tasks faster than traditional software. It is written in the programming language Scala, but it also has support for other widely used programming languages; Java, Python, R and SQL. In this course, we learn Spark using Python (pySpark).

Aims, Objectives and Intended Learning Outcomes: Before attending this course you were given instructions on how to register, login and start a Cluster and a Notebook on https://databricks.com. Before starting this course you should have already done all these.

By the end of Chapter 1 you should understand and be able to explain:

  1. What is Big Data and its characteristics

  2. What is Distributed Computing Framework (DCF)

  3. What is Spark, its relation to the DCF, what it can do, its advantages and disadvantages

  4. How Spark runs on a Cluster

  5. The optimal way to handle partitioning

  6. What a SparkSession is

By the end of Chapter 2, you should understand and be able to explain:

  1. What DataFrames are, why we use them, and advantages and disadvantages.

You should be able to:

  1. Create DataFrames from raw data, or from data files.

By the end of Chapter 3, you should be able to investigate data by displaying:

  1. The data

  2. The data structure

  3. The column names and

  4. Selected parts of the data

By the end of Chapter 4, you should understand what the Join of DataFrames is, and be able to explain the difference between the different joins. You should be able to Join DataFrames together with a:

  1. Left Join

  2. Right Join

  3. Inner Join and

  4. Outer Join.

  5. You also be able to union DataFrames.

By the end of Chapter 5, you should be able to perform Data querying and manipulation. You should be able to:

  1. Remove, Add, Rename a column

  2. Filter on a condition of a column and on multiple columns

  3. Replace values dependent on conditions

  4. Check for nulls

  5. Drop Duplicates

  6. Aggregate Data

  7. Do Summary Statistics

By the end of Chapter 6, you should be able to:

  1. Use .repartition() and .coalesce() to obtain the optimal partitioning for your DataFrame and the cores you have available. You need to understand what the optimal partitioning is.

  2. Save data in a json or csv format.

By the end of Chapter 7, you should be able to:

  1. Take a sample from the Data

  2. Convert Spark DataFrames into pandas DataFrame

  3. Make a simple scatterplot in Python.

By the end of Chapter 8, you should understand:

  1. How to create a SparkSession in the ONS Data Service to be able to run pyspark commands.

By the end of Chapter 9 you should:

  1. Feel confident to deal with Data using pyspark.

  2. Be able to combine what we learnt in the previous Chapters to investigate and manipulate Data.

  3. Be able to investigate the current partitioning your DataFrame, calculate the optimal partition based on your DataFrame and your resources and make the necessary changes to achieve that.

Datasets: department_budget.csv, Travel_to_Work_Areas.csv, Travel_to_Work_Small_Areas.csv, Item_Price_Data.csv, Item_Defect_Dataset.csv, Item_Defect_Dataset_Modified.csv

Libraries: pySpark

Images Apart from a few bespoke created images, most of the images used in this course are taken from https://databricks.com/. The ones that were taken from other sources else they have a caption under them detailing their source.

Acknowledgements: We would like to thank the people that took the time to review the course material; Arturas Eidukas and Isabela Breton, and those who piloted the course for us and provided exceptionally useful feedback; Sonia Williams, Sonia Mazzi, Matt Wenham, Chris Bonham, and Raul Garcia.

1 Introduction to Spark


Intended Learning Outcomes: Before attending this course you were given instructions on how to register, login and start a Cluster and a Notebook on https://databricks.com. Before starting this course you should have already done all these.

By the end of Chapter 1 you should understand and be able to explain:

  1. What is Big Data and its characteristics

  2. What is Distributed Computing Framework (DCF)

  3. What is Spark, its relation to the DCF, what it can do, its advantages and disadvantages

  4. How Spark runs on a Cluster

  5. The optimal way to handle partitioning

  6. What a SparkSession is


1.1 Big Data

The term Big Data refers to data sets that are too large and complex for traditional software to handle. Nowadays, such data sets are everywhere and contain information that we want to reveal. Big Data are characterised by high volume, velocity, variety and veracity (how much noise the dataset has).

Challenge: Specific technology is required to capture, store, analyse, visualise etc this kind of data set.

Questions:

  1. What is Big Data?

  2. Name and explain the four characteristics of the Big Data

1.2 Distributed Computing Framework (DCF)

Distributed Computing is a system where multiple machines are working and communicating simultaneously with each other. It is used when there is a need for fast processing of a huge amount of data. The data is split into discrete, non-overlapping sections, with each machine running operations on one section of data. The cluster as a whole then reports up to the driver any results from the operations.

Famous Example of how DCF works:

I am in a library and I want to count the exact number of books in an hour. To achieve that, I call as many people as possible and divide the areas among them in a non-overlapping system. I ask them to be back right before the hour. Once they are back, I simply add the numbers to calculate the exact number of books in the library.


Apache Hadoop and Apache Spark are designed to do exactly that. They are designed to make resource allocation and result collecting a straightforward process.

Questions:

  1. Explain with your own words what a DCF is.

  2. Give an example of a DCF, like the one mentioned above.

1.3 What is Spark?

Spark is a relatively new DCF used for processing, querying and analysing Big Data. It is open source and it can run tasks faster than previous DCFs by using in-memory computation. It is written in the programming language Scala, but it also has support for other widely used programming languages; Java, Python, R and SQL, and is used as an interface to control entire clusters of computers.

Question:

  1. How can Spark help with the Big Data challenges?

1.4 Why use Spark?

1.4.1 Over traditional software

Spark is inherently scalable, and can handle vast amounts of data beyond the reach of traditional software and data analysis methods. For datasets too large to be held in normal computer memory, operations can be run over the entire data at once.

1.4.2 Over other DCFs

Spark’s key advantage is the ability to perform in memory computation. The entire data set can be held across the distributed cluster, operations and transformations can be applied, without having to write to a hard drive. Previous DCFs such as Hadoop had to read and write to the file system with each transformation, adding a huge time overhead (especially when the data is very large.)

Spark also has Lazy Evaluation - it will only evaluate commands when it is needed, and will identify the optimal, most efficient, way to perform a series of commands. For example, spark can be tasked to perform many transformations to manipulate the data, but until a command requiring a direct response - such as a count of the data - is issued, the manipulations will be queued up and optimised.

1.4.3 Advantages, Disadvantages, Limitations of using Spark

Advantages:

  1. Fast processing of Big Data compared with other DCFs

  2. Capable of handling large data sets where traditional software fails

  3. Supports multiple languages (Scala, Java, R, Python)

  4. Lazy Evaluation: data is evaluated only when an action is called for computation.

Disadvantages and Limitations:

  1. It does not have its own file management system (relying on Hadoop or other platforms). When it is used with Hadoop, it supports a limited number of large files, thus ending up with a lot of small files.

  2. Some failures are so vague that it is not easy to understand what they mean and where they are coming from.

  3. There are not as many ready-made algorithms as using traditional languages (e.g. Python).

  4. Python and R users are always a step behind in updates compared to Scala and Java users - features can be slow to be implemented.

Question:

  1. Why would someone prefer to use Spark, instead of the traditional software?

1.5 Who uses Spark?

Data engineers, data scientists, application developers.

Companies such as TripAdvisor, Yahoo!, eBay, MyFitnessPal, and the ONS.

1.6 How does Spark run on a Cluster

A Cluster is the group of computers (nodes) which are all connected and coordinated together to perform tasks. Your own laptop could be part of this group of computers, or the one controlling them.

The Driver Process runs the main program, and queues tasks for the cluster to complete. Any results get collected and brought back to the driver.

Spark uses a Cluster Manager to coordinate the work across such a cluster for you, it directs the workers of the cluster as to which of the tasks set by the Driver they should be doing, .

The Executors, or workers, do the assigned tasks (executing code) and report back to the Driver.

Image taken from: https://annefou.github.io.pyspark/03-pyspark_context/ by Anne Fouilloux.

1.7 Partitioning

Note:

Proper Partitioning: In the diagram there are some cores that are not used because each worker has more cores than partitions and thus, we are wasting resources. We need to think about proper partitioning to optimize those resources.

1.7.1 Efficient Partitioning

The importance of partitioning cannot be understated. Suppose that you have access to 8 cores. When thinking about partitioning, the primary aim should be to have our data equally distributed between the cores.

If we have 8 cores, and 1 million records, it stands to reason that we want 125,000 records on each core. Unbalanced partitioning can lead to massive slow done. We want the number of partitions to be a multiple of the number of cores, to ensure that every core is being used equally.

In the first diagram below, if we assume that each partition takes 10 seconds for a task to happen, the total time will be 20 seconds - 10 seconds for the first 8 partitions, and 10 seconds for the last 2. During the last 2, 6 cores are sitting idle.

In the second diagram, with half as many partitions, each twice as large as before. The processing will take 20 seconds, but 3 cores will sit idle for the entire time.

In the diagram below, with the data equally split into 8 partitions, each partition will take 12.5 seconds to process. As each core has only 1 partition, the overall processing time will be 12.5 seconds, with no idle cores.

Questions:

  1. Try to explain how Spark runs on a Cluster.

  2. Try to explain how we should handle partitioning.

1.8 How do I use Spark?

Spark can be run locally, using just the resources of your laptop/computer, or on a cluster. To run Spark on a local machine you need to install Java. If you want to use the Python API, you will need a Python interpreter and if you want to use R, then you need R on your machine. This can be done without any distributed storage system, and is useful for prototyping, development, and debugging.

To harness the full potential of a fully operational cluster, there are two main providers of cloud based Spark facilities, Databricks, and Cloudera. Databricks has a web-based interface, that is available for free in the Databricks Community Edition https://databricks.com/try-databricks, which we will be using today.

Within the ONS, the Data Access Platform runs an internally hosted Cloudera Data Science Workbench (CDSW) environent, which allows user definable numbers of clusters, memory allocation, and access to the secure data stored within HUE. HUE, or Hadoop User Experience, is a web interface which gives access to SQL databases, tables, and other data files hosted internally within the ONS. Within CDSW, users can write Python, R, or Scala code to interact with clusters.

1.9 Starting with pySpark

The first thing a Spark program must do is to create a SparkSession object, which tells Spark how to access a cluster. A simple SparkSession is created below, but can be tailored Chapter 8).

Within DataBricks, a SparkSession is automatically created with the name spark, ready for use, however within the ONS Data Service , you will need to create your own.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

You can find pySpark Documentation at https://spark.apache.org/docs/latest/api/python/index.html

Question:

  1. When do we have to define the SparkSession?


1.10 Intended Learning Outcomes

Now, you should understand and be able to explain:

  1. What is Big Data and its characteristics

  2. What is Distributed Computing Framework (DCF)

  3. What is Spark, its relation to the DCF, what it can do, its advantages and disadvantages

  4. How Spark runs on a Cluster

  5. The optimal way to handle partitioning

  6. What a SparkSession is


2 DataFrames


Intended Learning Outcomes:

By the end of this chapter, you should understand and be able to explain:

  1. What DataFrames are, why we use them, and advantages and disadvantages.

You should be able to:

  1. Create DataFrames from raw data, or from data files.


2.1 What are they

A DataFrame is a collection of data, organised into named columns - with variables in columns and observations in rows. They are similiar to dataframes in R and Python (from the Pandas package), and are built on a foundation of RDDs - as such they share many characteristics, such as immutability. Fundamentally, DataFrames are an expansion of RDDs - they are more efficient, have a large amount of convenient built-in functions, and handle data in a much more structured manner, which is far easier for users to understand intuitively.

DataFrames contain rows of data, with a well defined schema which illustrates the structure of the data - it’s column names and data types.

More information can be found at the links below: https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/

Note:

  1. Structured Data - information which can be stored in an SQL database in a table with rows and columns and are co-related.

  2. Semi-Structured Data - information that is not in a relational database but it is organised in a way that make it easier to analyze eg csv, json, xml files.

  3. Unstructured Data - everything else such as videos, photos, audio files.

2.2 Why use them

DataFrames can handle structured data, overcoming one of RDDs key limitations.

DataFrames also have APIs available in Python, R, Java, and Scala, and are generally more accessible than RDDs for those new to spark. They however, don’t offer the low-level functionality and control of RDDs.

DataFrames also support different data formats (csv, parquet, Cassandra) and storage systems (Hive tables, mysql etc.).

2.2.1 Actions and Transformations

As with the rest of spark, there are two types of operations which can be performed on a DataFrame, transformations, and actions. Any method which takes a DataFrame, manipulates it in some way and returns a new version, is a transformation, while any method which returns something from within the DataFrame (e.g. a count of records, a print of the first few rows) is an action. Due to Spark’s lazy evaluation, transformations are queued up until an action is called.

As such, running a code with just transformations will appear to happen instantly, while then running a .show() method to look at the results will take longer - the actual processing occurs at the action call.

From the Python API, DataFrames have much greater performance than RDDs, due to the inclusion of a built in optimiser. As DataFrames have transformations and actions, transformations will be added to sparks in-built optimiser until an action is called. At this point, the optimiser will determine the most efficient way of completing the required transformations.

Question

  1. Why do we recommend DataFrames over RDDs?

2.3 Advantages, Drawbacks and Limitations

Advantages

  1. There is no difference in performance between Scala and Python.

  2. Massive performance benefits compared with RDDs.

  3. Much more accessible than RDDs - far simpler to operate and use efficiently.

Drawbacks and Limitations:

  1. Once we have transformed an RDD into a DataFrame we cannot recover the original RDD.

  2. Sometimes have to resort to accessing the underlying RDD for increased capability.

A comprehensive comparison of RDDs and DataFrames can be found here: https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/

Question

  1. What are some of the advantages and disadvantages of DataFrames?

2.4 Create them manually

A SparkSession is needed to create DataFrames, export DataFrames as tables, and execute SQL over tables. Within Databricks, one is created automatically with the name spark. Within the ONS Data Service, you’ll need to create one manually.

In the last Chapter we will learn how to modify the arguments for the SparkSession.

To create a DataFrame from raw data, define a list of column names, and a list of data, and pass them both to spark.createDataFrame(data, columns). As we can see below, the data should be provided on a row-by-row basis, rather than on a column-by-column basis.

columns = ['id', 'height(cm)', 'weight(kg)']
values = [('A', 160, 60), ('B', 180, 70), ('D', 150, 80)]

DataFrame_from_Scratch = spark.createDataFrame(values, columns)
DataFrame_from_Scratch.show()
## +---+----------+----------+
## | id|height(cm)|weight(kg)|
## +---+----------+----------+
## |  A|       160|        60|
## |  B|       180|        70|
## |  D|       150|        80|
## +---+----------+----------+

Exercises

  1. Create a dataframe with column ‘department’, containing values ‘Legal’,‘Applied Science’, ‘Head Office’, and ‘Methodology’, and a column ‘employees’, with values: 11, 49, 5, and 27. The dataframe should have column names of ‘department’ and ‘employees’. Name the dataframe something clear and memorable e.g. ‘department_employees’.

  2. Display the contents of the dataframe by using .show() Ensure that the column names and department names are exactly as specified above as they will used in later exercises.

Answers

columns = ['department', 'employees']
data = [('Legal', 11), 
        ('Applied Science', 49),
        ('Head Office', 5), 
        ('Methodology', 27)]
        
department_employees = spark.createDataFrame(data, columns)
department_employees.show()
## +---------------+---------+
## |     department|employees|
## +---------------+---------+
## |          Legal|       11|
## |Applied Science|       49|
## |    Head Office|        5|
## |    Methodology|       27|
## +---------------+---------+

Guidance

  • Make sure your column names are identical to those in this answer - lower case for both department and employees
  • Make sure your department names are also identical to the examples above - capital letters for each word in a department name.
  • As displayed in the answer, the data list is made up of 4 elements, one per row in the final DataFrame. Each element is a tuple which contains data for every column in the desired DataFrame.
  • When you create a DataFrame and want to use it later on, always save it as a new variable, and then on the line below .show() that new variable. Never do something like the following:
problem_dataframe = spark.createDataFrame(data, columns).show()
  • This will display the desired DataFrame, but the variable problem_dataframe will not be a DataFrame - it will be a Nonetype object as it is set equal to the value that .show() returns, and .show() returns nothing!

2.5 Import data from external sources

2.5.1 From a CSV

spark.read.csv(path, header) will read in a CSV file, assign the data to a DataFrame, and if header is set, set the column names to the first row of the file. There are a huge amount of other variables which can be set, including sep to change the separator from a comma, and inferSchema if spark should attempt to infer datatypes automatically from the data.

Item_Price_DataFrame = spark.read.csv('./data/Item_Price_Data.csv', header=True)
Item_Price_DataFrame.show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

Within Databricks, the filepath needed is 'FileStore/tables/filename.csv'.

Within the ONS Data Service, filepaths should start hdfs://prod1/ and from then have the path from within HUE - the Hadoop User Experience file storage solution - appended. E.G. for the ONS BRES dataset, use hdfs://prod1/dapsen/landing_zone/ons/bres22. Full details can be found at http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv

2.5.2 From a json file

spark.read.json(path) will read the json file found at path, and assign it to a DataFrame. As with read.csv, there are many different additional variables that can be passed to the function.

Json_File_Item_Price = spark.read.json("./data/Json_File_Item_Price.json")
Json_File_Item_Price.show()
## +----+-----------+---------------+-----+
## |Code|       Item|Last_Year_Price|Price|
## +----+-----------+---------------+-----+
## |  22|Black Chair|             70|  100|
## |   3|White Table|            350|  500|
## |  16| Floor Lamp|             50|   60|
## |  12|      Couch|            900| 1000|
## |   3|White Table|            499|  500|
## +----+-----------+---------------+-----+

2.5.3 From an SQL table

SQL commands can be used to read from tables in databases. The below example will accept any valid SQL expression.

Within the ONS Data Service, most data will be stored in SQL tables, so this will probably be the most common import method you come across. Importantly, SQL tables include metadata such as column datatypes - which aren’t explicitly defined in CSV files.

SQL_Item_Price_example = spark.sql('SELECT * FROM database_name.table_name')


Exercises

  1. Import the table ‘department_budget’ into a dataframe using the sql example above. This contains department names, divisions, and budget data - name it something suitable e.g. ‘department_budget’.

  2. Display some of the contents of the dataframe, to understand the structure of the data.

Answers

department_budget = spark.sql('SELECT * FROM default.department_budget')
department_budget.show()
## +--------------------+-----------+------+
## |          department|   division|budget|
## +--------------------+-----------+------+
## |                  HR|      Admin|  7852|
## |             Finance|      Admin|  8541|
## |               Legal|      Admin|  9656|
## |       International|      Admin|  1913|
## |         IT Services|  Technical|  7420|
## |         Procurement|      Admin|   744|
## |                 R&D| Scientific|  8389|
## |Software Engineering|Engineering|  6109|
## | Systems Engineering|Engineering|  6564|
## |              Legacy|  Technical|  2581|
## |               Space| Scientific|  7505|
## |             Transit|Engineering|  7069|
## |            Autonomy|Engineering|  4060|
## |                 Air|Engineering|  8712|
## |            Maritime|Engineering|  8480|
## |    Customer Support|      Admin|  8293|
## |            Training|  Technical|  5241|
## |              Brexit|      Admin|  8886|
## |            Planning|      Admin|  5484|
## |           Executive|      Admin|  6967|
## +--------------------+-----------+------+
## only showing top 20 rows

To read in the file from a csv, you could do the following instead:

department_budget = spark.read.csv('./data/department_budget.csv', header = True, inferSchema = True)

where inferSchema = True lets Pyspark work out the data types of the columns, and header= True takes the first row and uses it as column names.

Guidance

  • Within databricks, your database name will be default and the table name will be whatever you called it when you imported it - which will by default be department_budget_csv. By going to the data tab on the left hand side of databricks, you can see your table names, and by mousing over the desired table the entire name should appear.
  • If your results look different, such as the columns being called _c0, _c1, you might have incorrectly uploaded the data. Delete the previous table of data by going to the data tab, finding the desired table, pressing the down-arrow to the right of the name and selecting Delete. Then, follow the onboarding instructions to reupload the data, making sure to tick the required check boxes on the left hand side!


2.6 Intended Learning Outcomes

Now, you should understand and be able to explain:

  1. What DataFrames are, why we use them, and advantages and disadvantages.

You should be able to:

  1. Create DataFrames from raw data, or from data files.


3 Investigate data


Intended Learning Outcomes:

By the end of this Chapter, you should be able to investigate data by displaying:

  1. The data

  2. The data structure

  3. The column names and

  4. Selected parts of the data


3.1 Observe raw data

.show(x) will display in a visual format the first x elements of the DataFrame - up to 20 if x is not set.

Item_Price_DataFrame.show(6)
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

If a cell in a column has more than 20 characters, the end will be truncated. This can often lose the exponent value of a number with a lot of decimals, but can be avoided by passing a secondary argument into the function - .show(6, False) where the boolean refers to truncating or not.

data = [(1,1234567891234567891234510.9786)]
columns = ['idx', 'high_precision_data']
hpd = spark.createDataFrame(data, columns)
hpd.show()
## +---+--------------------+
## |idx| high_precision_data|
## +---+--------------------+
## |  1|1.234567891234567...|
## +---+--------------------+
hpd.show(1, False)
## +---+---------------------+
## |idx|high_precision_data  |
## +---+---------------------+
## |1  |1.2345678912345678E24|
## +---+---------------------+

Exercise

  1. Display the first 4 records of your department_budget DataFrame.

Answer

department_budget.show(4)
## +-------------+--------+------+
## |   department|division|budget|
## +-------------+--------+------+
## |           HR|   Admin|  7852|
## |      Finance|   Admin|  8541|
## |        Legal|   Admin|  9656|
## |International|   Admin|  1913|
## +-------------+--------+------+
## only showing top 4 rows

Guidance

  • Replace department_budget with the name of your budget DataFrame. Make sure to give your DataFrames sensible names to make referring back to them easier!
  • The displayed table says only showing top 4 rows - this means that there are more than 4 rows in the DataFrame, but not ohw many there are in total!

3.2 Return data for external use

.collect() will return a list of all rows of a DataFrame - avoid if the size of the data is larger than the size of the driver. .take(n) can be used to return only n rows of the DataFrame.

print(Item_Price_DataFrame.collect())
## [Row(Item='Black Chair', Code='22', Price='100', Last_Year_Price='70', Quantity='10'), Row(Item='White Table', Code='3', Price='500', Last_Year_Price='350', Quantity='50'), Row(Item='Floor Lamp', Code='16', Price='60', Last_Year_Price='50', Quantity='1'), Row(Item='White Table', Code='3', Price='500', Last_Year_Price='499', Quantity='20'), Row(Item='Couch', Code='12', Price='1000', Last_Year_Price='900', Quantity='5'), Row(Item='White Table', Code='3', Price='500', Last_Year_Price='499', Quantity='20')]

Exercises

  1. Run .collect() on your department - employees DataFrame. Try and to understand the results.

  2. In a new cell, run .take(n) on your department - employees DataFrame, and replace n to only return the first record.

Answers

print(department_employees.collect())
## [Row(department='Legal', employees=11), Row(department='Applied Science', employees=49), Row(department='Head Office', employees=5), Row(department='Methodology', employees=27)]
print(department_employees.take(1))
## [Row(department='Legal', employees=11)]

Guidance

  • The results are made up of Row objects, where each one is equal to a row of the DataFrame.
  • .take() just returns the very first row.
  • Remember, don’t use .collect() if you have a very large DataFrame! It will try to pull all of the data into the driver of your system, which could easily cause the driver to crash!

3.3 Observe data structure

.printSchema() can be used to print the schema for a DataFrame in a visual tree. A DataFrame’s schema consists of column names, datatypes, and any other key rules for columns.

Item_Price_DataFrame.printSchema()
## root
##  |-- Item: string (nullable = true)
##  |-- Code: string (nullable = true)
##  |-- Price: string (nullable = true)
##  |-- Last_Year_Price: string (nullable = true)
##  |-- Quantity: string (nullable = true)

.dtypes can also be used, to print out a list of tuples of 'Column name', 'datatype'.

print(Item_Price_DataFrame.dtypes)
## [('Item', 'string'), ('Code', 'string'), ('Price', 'string'), ('Last_Year_Price', 'string'), ('Quantity', 'string')]

Exercise

  1. What are the datatypes of the columns in your department - employees DataFrame?

Answer

Datatypes can be found by doing either of the comands above:

department_employees.printSchema()
## root
##  |-- department: string (nullable = true)
##  |-- employees: long (nullable = true)
print(department_employees.dtypes)
## [('department', 'string'), ('employees', 'bigint')]

Guidance

  • If you do both, you might notice that printSchema says that the employees column is a long, while dtypes claims it is a bigint. Fundamentally, these are the same thing! They represent an integer, between -9223372036854775808 and 9223372036854775807!

3.4 List of Column Names

.columns just returns a list of column names.

print(Item_Price_DataFrame.columns)
## ['Item', 'Code', 'Price', 'Last_Year_Price', 'Quantity']

Exercise

  1. Print out a list of columns from your department - employees DataFrame.

Answer

columns = department_employees.columns
print(columns)
## ['department', 'employees']

3.5 Specify desired data

.select() can be used to select parts of the data - either a single column, or multiple columns. It takes a column name, multiple column names, or a list object containing column names.

Select a single column.

Item_Price_DataFrame.select("Item").show()
## +-----------+
## |       Item|
## +-----------+
## |Black Chair|
## |White Table|
## | Floor Lamp|
## |White Table|
## |      Couch|
## |White Table|
## +-----------+

Select two columns.

Item_Price_DataFrame.select("Item","Price").show()
## +-----------+-----+
## |       Item|Price|
## +-----------+-----+
## |Black Chair|  100|
## |White Table|  500|
## | Floor Lamp|   60|
## |White Table|  500|
## |      Couch| 1000|
## |White Table|  500|
## +-----------+-----+

Exercise

  1. Display only the first column of the department - employees DataFrame you created.

Answer

department_employees.select('department').show()
## +---------------+
## |     department|
## +---------------+
## |          Legal|
## |Applied Science|
## |    Head Office|
## |    Methodology|
## +---------------+
department_employees.select(['department']).show()
## +---------------+
## |     department|
## +---------------+
## |          Legal|
## |Applied Science|
## |    Head Office|
## |    Methodology|
## +---------------+
department_employees.select(columns[0]).show()
## +---------------+
## |     department|
## +---------------+
## |          Legal|
## |Applied Science|
## |    Head Office|
## |    Methodology|
## +---------------+

Guidance

  • All three of the above commands return the exact same results! The first one takes just a string, the second one takes a list, and the third one uses the list of column names created in the previous example!

3.6 Count the number of records

.count() can be used to count the number of records in a DataFrame. When running .count(), it is advisable to set the output to a variable so it can be accessed in the future without having to rerun the action.

data_count = Item_Price_DataFrame.count()
print(data_count)
## 6

Exercise

  1. Count the number of records in both your department DataFrames.

Answer


budget_count = department_budget.count()
print('budget: ', budget_count)
## budget:  33
employees_count = department_employees.count()
print('employees: ', employees_count)
## employees:  4

Guidance

  • Remember, always save your count as a new variable, and then print that variable! This will save you a lot of headaches if your session terminates due to inactivty!


3.7 Intended Learning Outcomes

Now, you should be able to investigate data by displaying:

  1. The data

  2. The data structure

  3. The column names and

  4. Selected parts of the data


4 Combining DataFrames


Intended Learning Outcomes:

By the end of this Chapter, you should understand what the Join of DataFrames is, and be able to explain the difference between the different joins. You should be able to Join DataFrames together with a:

  1. Left Join

  2. Right Join

  3. Inner Join and

  4. Outer Join.

  5. You also be able to union DataFrames.


Joining DataFrames works the same as joining SQL tables.
Inner Join Outer Join
Left Join Right Join
Pictures taken from wikipedia.org

There are many different ways to join DataFrames. The main types are Inner, Left, Right, and Outer.

4.1 Joins Example

4.2 Left Join

Lets read in another set of data to join with our previous Item_Prices_Dataset.

Item_Defect_Data = spark.read.csv('./data/Item_Defect_Dataset.csv', header=True)
Item_Defect_Data.show()
## +-----------+----+---------+
## |       Item|Code|Defective|
## +-----------+----+---------+
## |Black Chair|  22|        4|
## |White Table|   3|       15|
## |      Couch|  12|        3|
## |   Red Door|  14|       41|
## +-----------+----+---------+

Left Joins will keep every record in the first DataFrame, and any matching data in the second DataFrame.

dataFrame.join(dataFrame2, on, how) will join dataFrame2 to dataFrame, by the method specified by how, and on the columns specified by on.

.join() will default to an inner join if how is not specified. on specifies the column or columns to join on, and can be expanded extensively if for example the DataFrames have different column names.

Item_left_join = Item_Price_DataFrame.join(Item_Defect_Data, on='Item', how='left') # Could also use 'left_outer'
Item_left_join.show()
## +-----------+----+-----+---------------+--------+----+---------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |Black Chair|  22|  100|             70|      10|  22|        4|
## |White Table|   3|  500|            350|      50|   3|       15|
## | Floor Lamp|  16|   60|             50|       1|null|     null|
## |White Table|   3|  500|            499|      20|   3|       15|
## |      Couch|  12| 1000|            900|       5|  12|        3|
## |White Table|   3|  500|            499|      20|   3|       15|
## +-----------+----+-----+---------------+--------+----+---------+

Exercises

  1. You would like to join your department DataFrames together, keeping only the data for which you have employee values. Will a left join be sufficient? What column should you join on?

  2. Create a new DataFrame for the join, name it something relevent, and show the results.

Note

This is the DataFrame you will be working from for the rest of this course. Don’t overwrite it with any different types of joins.

  1. What is the issue with this new joint DataFrame? It will be resolved in the next chapter.

Answers

A Left join will be enough to keep only data where you have employee values (the entirety of department_employees). However, a Right join could also be used, depending on the order of the DataFrames.

department_data = department_employees.join(department_budget, on = 'department', how = 'left')
department_data.show()
## +---------------+---------+----------+------+
## |     department|employees|  division|budget|
## +---------------+---------+----------+------+
## |          Legal|       11|     Admin|  9656|
## |Applied Science|       49|Scientific|  4781|
## |    Head Office|        5|      null|  null|
## |    Methodology|       27| Technical|  4644|
## +---------------+---------+----------+------+

This second example holds the exact same data, just with a slightly different column order.

department_budget.join(department_employees, on = 'department', how = 'right')
## DataFrame[department: string, division: string, budget: int, employees: bigint]

As we have kept all of the data from department_employees, a few null values appear, as there is no Head Office record in the budget DataFrame! We will resolve these at a later stage.

Guidance

  • Keep a track of this department_data DataFrame! You will need it at the beginning of the next section!

If you are overwriting previous DataFrames, like I have above, be very careful never to write a line of code like the following:

department_data = department_data.show()

This will display the DataFrame happily (due to the .show()), but will set department_data equal to none. Any future operations on department_data will fail, and throw up an error saying something like: AttributeError: 'NoneType' object has no attribute 'show'. If this occurs, you might have to rerun a large amount of code - from when the DataFrame was first created!

4.3 Right Join

Right Joins will keep every record in the second DataFrame, and any matching data in the first DataFrame.

Item_right_join = Item_Price_DataFrame.join(Item_Defect_Data, on='Item', how='right') # Could also use 'right_outer'
Item_right_join.show()
## +-----------+----+-----+---------------+--------+----+---------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |Black Chair|  22|  100|             70|      10|  22|        4|
## |White Table|   3|  500|            499|      20|   3|       15|
## |White Table|   3|  500|            499|      20|   3|       15|
## |White Table|   3|  500|            350|      50|   3|       15|
## |      Couch|  12| 1000|            900|       5|  12|        3|
## |   Red Door|null| null|           null|    null|  14|       41|
## +-----------+----+-----+---------------+--------+----+---------+

Exercise

  1. Show the results of joining your original DataFrames together, this time with a right join. Keep your DataFrames in the same order as in the above question. How does this differ to your original joint DataFrame? Don’t overwrite your left-joined DataFrame from above.

Answer

department_employees.join(department_budget, on = 'department', how = 'right').show()
## +--------------------+---------+-----------+------+
## |          department|employees|   division|budget|
## +--------------------+---------+-----------+------+
## |       International|     null|      Admin|  1913|
## |    Customer Support|     null|      Admin|  8293|
## |            Maritime|     null|Engineering|  8480|
## |             Transit|     null|Engineering|  7069|
## |          Accounting|     null|      Admin|  3275|
## |                  HR|     null|      Admin|  7852|
## |         IT Services|     null|  Technical|  7420|
## |Facilities Manage...|     null|      Admin|   114|
## |              Legacy|     null|  Technical|  2581|
## |             Finance|     null|      Admin|  8541|
## |     Applied Science|       49| Scientific|  4781|
## |           Executive|     null|      Admin|  6967|
## |            Planning|     null|      Admin|  5484|
## |          Production|     null|Engineering|  4671|
## |              Brexit|     null|      Admin|  8886|
## |Radio Communications|     null| Scientific|  3370|
## |            Graphics|     null|  Technical|  9724|
## |            Autonomy|     null|Engineering|  4060|
## |                Mail|     null|      Admin|  4575|
## | Economic Statistics|     null|  Technical|  9629|
## +--------------------+---------+-----------+------+
## only showing top 20 rows

4.4 Inner Join

Inner Joins will return only records which can be joined in both DataFrames.

Item_inner_join = Item_Price_DataFrame.join(Item_Defect_Data, on='Item', how='inner')
Item_inner_join.show()
## +-----------+----+-----+---------------+--------+----+---------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |Black Chair|  22|  100|             70|      10|  22|        4|
## |White Table|   3|  500|            350|      50|   3|       15|
## |White Table|   3|  500|            499|      20|   3|       15|
## |      Couch|  12| 1000|            900|       5|  12|        3|
## |White Table|   3|  500|            499|      20|   3|       15|
## +-----------+----+-----+---------------+--------+----+---------+

Exercise

  1. Show the results of joining your original DataFrames together, this time with an inner join. How does this differ to your original DataFrame? Don’t overwrite your left-joined DataFrame from above.

Answer

department_employees.join(department_budget, on = 'department', how = 'inner').show()
## +---------------+---------+----------+------+
## |     department|employees|  division|budget|
## +---------------+---------+----------+------+
## |          Legal|       11|     Admin|  9656|
## |Applied Science|       49|Scientific|  4781|
## |    Methodology|       27| Technical|  4644|
## +---------------+---------+----------+------+

4.5 Outer Join

Outer Joins will keep all data from both DataFrames, and overlap them where possible.

Item_outer_join = Item_Price_DataFrame.join(Item_Defect_Data, on='Item', how='outer')
Item_outer_join.show()
## +-----------+----+-----+---------------+--------+----+---------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |   Red Door|null| null|           null|    null|  14|       41|
## |Black Chair|  22|  100|             70|      10|  22|        4|
## |      Couch|  12| 1000|            900|       5|  12|        3|
## | Floor Lamp|  16|   60|             50|       1|null|     null|
## |White Table|   3|  500|            350|      50|   3|       15|
## |White Table|   3|  500|            499|      20|   3|       15|
## |White Table|   3|  500|            499|      20|   3|       15|
## +-----------+----+-----+---------------+--------+----+---------+

Exercise

  1. Show the results of joining your original DataFrames together, this time with an outer join. How does this differ to your original DataFrame? Don’t overwrite your left-joined DataFrame from above.

Answer

department_employees.join(department_budget, on = 'department', how = 'outer').show()
## +--------------------+---------+-----------+------+
## |          department|employees|   division|budget|
## +--------------------+---------+-----------+------+
## |       International|     null|      Admin|  1913|
## |    Customer Support|     null|      Admin|  8293|
## |            Maritime|     null|Engineering|  8480|
## |             Transit|     null|Engineering|  7069|
## |          Accounting|     null|      Admin|  3275|
## |                  HR|     null|      Admin|  7852|
## |         IT Services|     null|  Technical|  7420|
## |Facilities Manage...|     null|      Admin|   114|
## |              Legacy|     null|  Technical|  2581|
## |             Finance|     null|      Admin|  8541|
## |         Head Office|        5|       null|  null|
## |     Applied Science|       49| Scientific|  4781|
## |           Executive|     null|      Admin|  6967|
## |            Planning|     null|      Admin|  5484|
## |          Production|     null|Engineering|  4671|
## |              Brexit|     null|      Admin|  8886|
## |Radio Communications|     null| Scientific|  3370|
## |            Graphics|     null|  Technical|  9724|
## |            Autonomy|     null|Engineering|  4060|
## |                Mail|     null|      Admin|  4575|
## +--------------------+---------+-----------+------+
## only showing top 20 rows

4.6 Unioning DataFrames

While joining combines DataFrames horizontally - adding extra columns - .union() can be used to combine DataFrames vertically - adding extra records.

.union() requires each DataFrame having the same number of columns, but doesn’t check that the columns have the same name, datatype etc. It will just append the second DataFrame’s first column to the first column of the first DataFrame, for every column. As such, ensure that your data format is identical before performing a union - that your columns are in the same order in both DataFrames.

shuffled_Item_Price = Item_Price_DataFrame.select('Code','Price','Quantity','Item','Last_Year_Price')
Item_Price_DataFrame.union(shuffled_Item_Price).show() 
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## |         22| 100|   10|    Black Chair|      70|
## |          3| 500|   50|    White Table|     350|
## |         16|  60|    1|     Floor Lamp|      50|
## |          3| 500|   20|    White Table|     499|
## |         12|1000|    5|          Couch|     900|
## |          3| 500|   20|    White Table|     499|
## +-----------+----+-----+---------------+--------+

Instead of .union(), .unionByName() can be used. This will look through the column names, and pair each column with its matching column name. Unlike .union(), which doesn’t care about column names at all, .unionByName() requires each DataFrame to have the same column names, but the order can be different.

Item_Price_DataFrame.unionByName(shuffled_Item_Price).show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## |Black Chair|  22|  100|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

Exercises

  1. What would happen if you union your department-employees, and department-budget DataFrames? Perform the transformation, and see if the results match your expectations

  2. Create a new DataFrame containing your department_employees DataFrame unioned to itself, and call it unioned_employees. Show it to observe the results.

Answers

You should get an error message when you union the different DataFrames together! But pyspark is helpful here and gives a useful error message. If you have used union, it will tell you that the number of columns in the first DataFrame is not equal to the number of columns in the second DataFrame. If you have tried unionByName, it will tell you that a specific column does not appear in both of the DataFrames!

unioned_employees = department_employees.union(department_employees)
unioned_employees.show()
## +---------------+---------+
## |     department|employees|
## +---------------+---------+
## |          Legal|       11|
## |Applied Science|       49|
## |    Head Office|        5|
## |    Methodology|       27|
## |          Legal|       11|
## |Applied Science|       49|
## |    Head Office|        5|
## |    Methodology|       27|
## +---------------+---------+

OR:

unioned_employees = department_employees.unionByName(department_employees)

Guidance

  • Make sure to comment out the line which produced an error! Else, you will be unable to run all of the cells in the future!
  • union and unionByName both have their uses.
  • If the DataFrames you are unioning have different column names, but the columns contain the same data in the same order, union will work best.
  • However, if the column names are identical, but the order has been scrambled, unionByName is better!
  • If, the columns have different names AND are in a different order, some cleaning will need to be done before any unioning can occur!


4.7 Extended Joins

All of the joins performed above involved joining two DataFrames on one column, which is present and has the same name in both the original DataFrames. This can be expanded to multiple columns, or columns without matching names.

Join on different named columns

We’ll import a new example set of data.

Item_Defects_Modified = spark.read.csv('./data/Item_Defect_Dataset_Modified.csv', header = True)

Item_Defects_Modified.show()
## +-----------+---+---------+
## |Description| ID|Defective|
## +-----------+---+---------+
## |Black Chair| 22|        4|
## |White Table|  3|       15|
## |      Couch| 12|        3|
## |   Red Door| 14|       41|
## |White Table|  4|        3|
## +-----------+---+---------+

To join this data to our previous Item_Price_DataFrame, we need to match up the original Item column, with the new Description column.

When we perform the join, we directly match up column names from specific DataFrames.

Combined_Item_Data = Item_Price_DataFrame.join(Item_Defects_Modified, 
                                               on = (Item_Price_DataFrame['Item'] == Item_Defects_Modified['Description']), 
                                               how = 'inner')

Combined_Item_Data.show()
## +-----------+----+-----+---------------+--------+-----------+---+---------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Description| ID|Defective|
## +-----------+----+-----+---------------+--------+-----------+---+---------+
## |Black Chair|  22|  100|             70|      10|Black Chair| 22|        4|
## |White Table|   3|  500|            350|      50|White Table|  4|        3|
## |White Table|   3|  500|            350|      50|White Table|  3|       15|
## |White Table|   3|  500|            499|      20|White Table|  4|        3|
## |White Table|   3|  500|            499|      20|White Table|  3|       15|
## |      Couch|  12| 1000|            900|       5|      Couch| 12|        3|
## |White Table|   3|  500|            499|      20|White Table|  4|        3|
## |White Table|   3|  500|            499|      20|White Table|  3|       15|
## +-----------+----+-----+---------------+--------+-----------+---+---------+

This has the unfortunate effect of including both of the joining columns - in the example, Item and Description, are included in the results of the join, and these are going to be identical. Dropping one though is a trivial case of passing it to .drop(), as we will see in the next section.

There is a fundamental issue with the results above, due to the White Table records; data has been duplicated, as the join was performed on non-unique data. This can be fixed by joining on both Item/Description, and Code/ID. Multiple conditions should each be wrapped in a bracket, and the entire set of conditions wrapped in brackets.

Combined_Item_Data = Item_Price_DataFrame.join(Item_Defects_Modified, 
                                               on = ((Item_Price_DataFrame['Item'] == Item_Defects_Modified['Description']) & (Item_Price_DataFrame['Code'] == Item_Defects_Modified['ID'])),
                                               how = 'inner')

Combined_Item_Data.show()
## +-----------+----+-----+---------------+--------+-----------+---+---------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Description| ID|Defective|
## +-----------+----+-----+---------------+--------+-----------+---+---------+
## |Black Chair|  22|  100|             70|      10|Black Chair| 22|        4|
## |White Table|   3|  500|            350|      50|White Table|  3|       15|
## |White Table|   3|  500|            499|      20|White Table|  3|       15|
## |      Couch|  12| 1000|            900|       5|      Couch| 12|        3|
## |White Table|   3|  500|            499|      20|White Table|  3|       15|
## +-----------+----+-----+---------------+--------+-----------+---+---------+

As each Item/Code pairing is unique, the join occurs as expected. This is very important to watch out for, and can be checked by performing .count()s before and after, and checking the results seem sensible. Incorrect joins has led to in one case, a 2TB set of data being created and saved to a Database table. Unsuprisingly, writing, reading, and performing operations on this took an incredibly long time,

Linking data is a key part of Data Science, and this brief introduction to joins is just that - a brief introduction. There are a huge amount of resources online about joining data, and any techniques used to join SQL tables should translate to pyspark.


4.8 Intended Learning Outcomes

Now, you should understand what the Join of DataFrames is, and be able to explain the difference between the different joins. You should, also, be able to Join DataFrames together with a:

  1. Left Join

  2. Right Join

  3. Inner Join and

  4. Outer Join.

  5. You also be able to union DataFrames.


5 Data querying and manipulation


Intended Learning Outcomes:

By the end of this Chapter, you should be able to perform Data querying and manipulation. You should be able to:

  1. Remove, Add, Rename a column

  2. Filter on a condition of a column and on multiple columns

  3. Replace values dependent on conditions

  4. Check for nulls

  5. Drop Duplicates

  6. Aggregate Data

  7. Do Summary Statistics


When performing operations on DataFrames, there are usually equivalent SQL commands, and Python based functions. DataFrames can be queried using SQL syntax, or Python syntax, whichever is more convenient, and more familiar.

If you are performing transformations to a DataFrame, they will return a DataFrame object. It is generally advisable to save that object to a variable, and then on a new line perform an action (such as .count(), or .show()) on the DataFrame. This allows you to keep using the DataFrame for future work.

5.1 Add a Column

Columns can be added to DataFrames by passing a column name and a column object into .withColumn(name, col). Column objects can be created in different ways, but a few examples can be seen below.

Existing columns can have arithmetic applied to them by calling col(column_name) on the columns required.

from pyspark.sql.functions import col

Item_Price_DataFrame.withColumn('Price_Change', col('Price') - col('Last_Year_Price')).show()
## +-----------+----+-----+---------------+--------+------------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Price_Change|
## +-----------+----+-----+---------------+--------+------------+
## |Black Chair|  22|  100|             70|      10|        30.0|
## |White Table|   3|  500|            350|      50|       150.0|
## | Floor Lamp|  16|   60|             50|       1|        10.0|
## |White Table|   3|  500|            499|      20|         1.0|
## |      Couch|  12| 1000|            900|       5|       100.0|
## |White Table|   3|  500|            499|      20|         1.0|
## +-----------+----+-----+---------------+--------+------------+

lit(value) can be used to add a column with the same value in each record.

from pyspark.sql.functions import lit

Item_Price_DataFrame.withColumn('Constant_Value', lit(4)).show()
## +-----------+----+-----+---------------+--------+--------------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Constant_Value|
## +-----------+----+-----+---------------+--------+--------------+
## |Black Chair|  22|  100|             70|      10|             4|
## |White Table|   3|  500|            350|      50|             4|
## | Floor Lamp|  16|   60|             50|       1|             4|
## |White Table|   3|  500|            499|      20|             4|
## |      Couch|  12| 1000|            900|       5|             4|
## |White Table|   3|  500|            499|      20|             4|
## +-----------+----+-----+---------------+--------+--------------+

Exercises

  1. Create a new DataFrame from your joint DataFrame, containing the budget per head for each department.

  2. Create a new DataFrame from your budget per head DataFrame, containing today’s date as a string.

Answers

from pyspark.sql.functions import col

department_data = department_data.withColumn('bph', col('budget') / col('employees'))
department_data.show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|              bph|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  4781|97.57142857142857|
## |    Head Office|        5|      null|  null|             null|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+
from pyspark.sql.functions import lit
department_data = department_data.withColumn('date', lit('1901-01-01'))
department_data.show()
## +---------------+---------+----------+------+-----------------+----------+
## |     department|employees|  division|budget|              bph|      date|
## +---------------+---------+----------+------+-----------------+----------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|1901-01-01|
## |Applied Science|       49|Scientific|  4781|97.57142857142857|1901-01-01|
## |    Head Office|        5|      null|  null|             null|1901-01-01|
## |    Methodology|       27| Technical|  4644|            172.0|1901-01-01|
## +---------------+---------+----------+------+-----------------+----------+

5.2 Remove a Column

Recall the Item_Price_DataFrame

Item_Price_DataFrame.show(6)
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

To remove a column from a DataFrame, use .drop('colname').

Item_Price_DataFrame.drop("Last_Year_Price").show()
## +-----------+----+-----+--------+
## |       Item|Code|Price|Quantity|
## +-----------+----+-----+--------+
## |Black Chair|  22|  100|      10|
## |White Table|   3|  500|      50|
## | Floor Lamp|  16|   60|       1|
## |White Table|   3|  500|      20|
## |      Couch|  12| 1000|       5|
## |White Table|   3|  500|      20|
## +-----------+----+-----+--------+

.drop() can drop multiple columns, if they are entered as comma separated strings, and not as a list.

Item_Price_DataFrame.drop("Last_Year_Price", "Price").show()
#Item_Price_DataFrame.drop(["Last_Year_Price", "Price"]).show() # RAISES AN ERROR
## +-----------+----+--------+
## |       Item|Code|Quantity|
## +-----------+----+--------+
## |Black Chair|  22|      10|
## |White Table|   3|      50|
## | Floor Lamp|  16|       1|
## |White Table|   3|      20|
## |      Couch|  12|       5|
## |White Table|   3|      20|
## +-----------+----+--------+

However, Python’s List Unpacking method can be used - by passing in a list preceded with with an asterix *, the function will treat every item in the list as a separate parameter being passed in - so .drop(*['Price','Item']) == .drop('Price', 'Item')

Item_Price_DataFrame.drop(*["Last_Year_Price", "Price"]).show()
## +-----------+----+--------+
## |       Item|Code|Quantity|
## +-----------+----+--------+
## |Black Chair|  22|      10|
## |White Table|   3|      50|
## | Floor Lamp|  16|       1|
## |White Table|   3|      20|
## |      Couch|  12|       5|
## |White Table|   3|      20|
## +-----------+----+--------+

Exercise

  1. Create a new DataFrame from the above DataFrame, with the date column from above dropped.

Answer

department_data = department_data.drop('date') 
department_data.show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|              bph|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  4781|97.57142857142857|
## |    Head Office|        5|      null|  null|             null|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+

5.3 Rename a Column

A column can be renamed by passing a string of the old name and new name into .withColumnRenamed(old_name, new_name).

Item_Price_DataFrame.withColumnRenamed("Last_Year_Price","Old_Price").show()
## +-----------+----+-----+---------+--------+
## |       Item|Code|Price|Old_Price|Quantity|
## +-----------+----+-----+---------+--------+
## |Black Chair|  22|  100|       70|      10|
## |White Table|   3|  500|      350|      50|
## | Floor Lamp|  16|   60|       50|       1|
## |White Table|   3|  500|      499|      20|
## |      Couch|  12| 1000|      900|       5|
## |White Table|   3|  500|      499|      20|
## +-----------+----+-----+---------+--------+

Bulk renames can be done by looping over the column names, and renaming each one.

Exercise

  1. Create a new DataFrame with the budget per head column renamed above to budget_per_head. (Or something else, if yours is already called budget_per_head!)

Answer

department_data = department_data.withColumnRenamed('bph', 'budget_per_head')
department_data.show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  4781|97.57142857142857|
## |    Head Office|        5|      null|  null|             null|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+

5.4 Sort on Columns

The order of records can be altered by using .orderBy(cols).

Item_Price_DataFrame.orderBy('Item').show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## |      Couch|  12| 1000|            900|       5|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            350|      50|
## |White Table|   3|  500|            499|      20|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

The keyword argument ascending can be added to determine whether the data is ordered in ascending or descending order: .orderBy(cols, ascending = True)

Item_Price_DataFrame.orderBy('Item', ascending = False).show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |White Table|   3|  500|            499|      20|
## |White Table|   3|  500|            350|      50|
## |White Table|   3|  500|            499|      20|
## | Floor Lamp|  16|   60|             50|       1|
## |      Couch|  12| 1000|            900|       5|
## |Black Chair|  22|  100|             70|      10|
## +-----------+----+-----+---------------+--------+

Exercise

  1. Show your joint dataframe ordered by department in alphabetical order.

Answer

department_data.orderBy('department').show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |Applied Science|       49|Scientific|  4781|97.57142857142857|
## |    Head Office|        5|      null|  null|             null|
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+

5.5 Filter Data

5.5.1 Filter on a Condition

.filter(condition) will filter a DataFrame depending on the condition set. Example below will show prices that are above 600. .where can be used instead of .filter - they are aliases of one another. When creating the filter condition, it can be done with SQL, or using Python based column comparators.

Item_Price_DataFrame.filter(Item_Price_DataFrame['Price'] > 600).show() #Python based command

#Below will have the same effect.
#Item_Price_DataFrame.filter('Price > 600').show() #SQL based command
#Item_Price_DataFrame.filter(Item_Price_DataFrame.Price > 600).show() #Python based command
## +-----+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----+----+-----+---------------+--------+
## |Couch|  12| 1000|            900|       5|
## +-----+----+-----+---------------+--------+
Item_Price_DataFrame.filter(Item_Price_DataFrame.Price.between(40,120)).show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## | Floor Lamp|  16|   60|             50|       1|
## +-----------+----+-----+---------------+--------+

Exercise

  1. Show only departments which have a budget/head of more than 100.

Answer

department_data.where(department_data['budget_per_head'] > 100).show()
## +-----------+---------+---------+------+-----------------+
## | department|employees| division|budget|  budget_per_head|
## +-----------+---------+---------+------+-----------------+
## |      Legal|       11|    Admin|  9656|877.8181818181819|
## |Methodology|       27|Technical|  4644|            172.0|
## +-----------+---------+---------+------+-----------------+
department_data.filter('budget_per_head > 100').show()
## +-----------+---------+---------+------+-----------------+
## | department|employees| division|budget|  budget_per_head|
## +-----------+---------+---------+------+-----------------+
## |      Legal|       11|    Admin|  9656|877.8181818181819|
## |Methodology|       27|Technical|  4644|            172.0|
## +-----------+---------+---------+------+-----------------+
department_data.where(department_data.budget_per_head > 100).show()
## +-----------+---------+---------+------+-----------------+
## | department|employees| division|budget|  budget_per_head|
## +-----------+---------+---------+------+-----------------+
## |      Legal|       11|    Admin|  9656|877.8181818181819|
## |Methodology|       27|Technical|  4644|            172.0|
## +-----------+---------+---------+------+-----------------+

Guidance

  • With Pyspark there is always a few ways of doing things. All of the above will work absolutely fine!
  • The first method is my favourite, the second uses a more SQL-like syntax, and the third is similar to the first, but refers to columns by doing dataframe.column_name.
  • If using the third method - dataframe.column_name, make sure there are no spaces in the column name! This is good practice in general, never put spaces in column names!
  • Also if using the third method, make sure your column names don’t clash with any attributes of the DataFrame - for example don’t call a column dtypes, becuase if you do department_data.where(department_data.dtypes > 10), it will throw an error as it tries to use the attribute rather than the column!

5.5.2 Filter on Multiple Conditions

Multiple conditions can be entered, via SQL or using Python. With SQL, all commands should be in one string, separated by and or or. If using Python style column comparison, each separate condition should be wrapped in brackets () and separated by & or |for and/or respectively.

Item_Price_DataFrame.filter((Item_Price_DataFrame['Price'] > 300) & (Item_Price_DataFrame['Code'] > 3)).show() #Python based command

#Below will have the same effect.
#Item_Price_DataFrame.filter('Price > 300 and Code > 3').show() #SQL based command
#Item_Price_DataFrame.filter((Item_Price_DataFrame.Price > 300) & (Item_Price_DataFrame.Code > 3)).show() #Python based command
## +-----+----+-----+---------------+--------+
## | Item|Code|Price|Last_Year_Price|Quantity|
## +-----+----+-----+---------------+--------+
## |Couch|  12| 1000|            900|       5|
## +-----+----+-----+---------------+--------+

Exercise

  1. Show only departments which have a budget/head of more than 100, and an employee count of less than 15.

Answer

department_data.filter((department_data['budget_per_head'] > 100) & (department_data['employees'] < 15)).show()
## +----------+---------+--------+------+-----------------+
## |department|employees|division|budget|  budget_per_head|
## +----------+---------+--------+------+-----------------+
## |     Legal|       11|   Admin|  9656|877.8181818181819|
## +----------+---------+--------+------+-----------------+
department_data.where('budget_per_head > 100 and employees < 15').show()
## +----------+---------+--------+------+-----------------+
## |department|employees|division|budget|  budget_per_head|
## +----------+---------+--------+------+-----------------+
## |     Legal|       11|   Admin|  9656|877.8181818181819|
## +----------+---------+--------+------+-----------------+

Guidance

  • Make sure you wrap each condition in a set of brackets if using the first method!
  • Don’t forget that if you use the pythonic method, use & to mean and and | to mean or. If using the SQL syntax, just use the words!

5.6 Make Corrections to Data

.when(condition,value_if_true) and .otherwise(value_if_false) can be used together to conditionally change values in a DataFrame. If the data has a known error, it can be corrected easily as below, by overwriting the column with a new value if the condition is True, or use the original column value if the condition is False.

Conditions can be based on other columns in the DataFrame, in which the value for that record will be assessed. .when(condition, value_if_true) can be used without .otherwise(value_if_false), and will just fill the missing values with nulls.

from pyspark.sql.functions import when

Item_Price_DataFrame.withColumn('Price', when(Item_Price_DataFrame["Price"] == 100, 99.99).otherwise(Item_Price_DataFrame["Price"])).show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|99.99|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

Exercises

  1. A correction has come through for the Applied Science department budget, it should actually be 6781. Create a new DataFrame with the correction, and display it to check it has gone through.

  2. The budget per head is now incorrect, so redo the previous exercises to Create a Dataframe with the correction, and check which departments now have a budget/head of more than 100.

Answers

from pyspark.sql.functions import when

department_data_corrected = department_data.withColumn('budget', when(department_data['department'] == 'Applied Science', 6781).otherwise(department_data['budget']))

department_data_corrected.show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  6781|97.57142857142857|
## |    Head Office|        5|      null|  null|             null|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+

But this would also work for this specific example, as the Applied Science department is the only one with a previous budget of 4781:

department_data.withColumn('budget', when(department_data['budget'] == 4781, 6781).otherwise(department_data['budget']))

Once the corrected value is in, the budget per head can be recalculated:

department_data_corrected = department_data_corrected.withColumn('budget_per_head', col('budget') / col('employees'))
department_data_corrected.show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  6781|138.3877551020408|
## |    Head Office|        5|      null|  null|             null|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+

Guidance

  • Make sure you specify the right combination of columns! The first one, budget, is the column you want to make changes in. Usually, this will be the same as the one in the otherwise() section, as you want the default value to be keep the original data. In the conditional when() statement, use any column that you want to run the logic against!
  • Multiple when() statements can be chained together -
df.withColumn('name', when(condition_1, value_1).when(condition_2, value_2)...when(condition_x, value_x).otherwise(default_value))
  • This gives it the form of an if-else block of conditions!
  • When making small changes like this, be as specific as you can. The code above changes the Applied Science budget successfully, but what if there was an Applied Science department within a different division? That would also get its budget changed! Instead of querying on one column, query on as many as you can to be as specific as possible - you could always put the condition in a filter block first, to check that only the record you care about will be deleted!
department_data.withColumn('budget', when(((department_data['department'] == 'Applied Science') & (department_data['budget'] == 4781) & (department_data['division'] == 'Scientific')),6781).otherwise(department_data['budget'])).show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  6781|97.57142857142857|
## |    Head Office|        5|      null|  null|             null|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+

5.7 Dealing with Missing Data - Nulls

5.7.1 Finding Nulls

The idea of null in spark is similar to in R and Python; as the absence of data, it cannot be located with traditional conditions such as column == null. Instead, special methods must be used.

There is a simple command, .isNull() which will reveal any nulls.

To check if there are nulls in the Outer Joint DataFrame, the below can be used.

Item_outer_join.filter(Item_outer_join["Price"].isNull()).show()

#using the SQL style method
#Item_outer_join.filter('Price is null').show()
## +--------+----+-----+---------------+--------+----+---------+
## |    Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +--------+----+-----+---------------+--------+----+---------+
## |Red Door|null| null|           null|    null|  14|       41|
## +--------+----+-----+---------------+--------+----+---------+

Likewise, to find records where the value is not null, .isNotNull() can be used.

Item_outer_join.filter(Item_outer_join["Price"].isNotNull()).show()
#Item_outer_join.filter('Price is not null').show()
## +-----------+----+-----+---------------+--------+----+---------+
## |       Item|Code|Price|Last_Year_Price|Quantity|Code|Defective|
## +-----------+----+-----+---------------+--------+----+---------+
## |Black Chair|  22|  100|             70|      10|  22|        4|
## |White Table|   3|  500|            350|      50|   3|       15|
## | Floor Lamp|  16|   60|             50|       1|null|     null|
## |White Table|   3|  500|            499|      20|   3|       15|
## |      Couch|  12| 1000|            900|       5|  12|        3|
## |White Table|   3|  500|            499|      20|   3|       15|
## +-----------+----+-----+---------------+--------+----+---------+

Exercise

  1. When we joined our two departmental DataFrames together, the department Head Office had some missing data. Use .filter() and .isNotNull() to Create a new DataFrame containing only records where we have both employment, and budget, data.

Answer

department_data_no_null = department_data.filter((department_data['budget'].isNotNull() & department_data['employees'].isNotNull())) 
department_data_no_null.show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  4781|97.57142857142857|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+

5.7.2 Replacing Nulls

Nulls can be handled in two main ways, they can be replaced, or they can be dropped. There are two ways in which they can be replaced. The first, using the method in 5.6, is to use other columns to impute missing data. The second involves blanket replacing nulls with other values using .fillna().

.fillna(value, subset = None) can be used to blanket replace nulls with a specified value, over all columns if subset = None, or over a set of columns if subset = [cols]. The value can either be a single value for all columns, or a dictionary of column_name : value pairs. If a subset of columns is used, be careful that the fill value is of the same data type as the subsetted columns, else it will be ignored. If a dictionary of column_name : value pairs is used, then subset is ignored.

Item_Price_Nulls = spark.read.csv('./data/Item_Price_Nulls.csv', header = True)
Item_Price_Nulls.show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|    null|
## |White Table|null|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3| null|            499|      20|
## |      Couch|  12| null|            900|    null|
## |White Table|   3|  500|            499|      20|
## |       Sofa|null| null|           null|    null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.fillna('fill_value').show()
## +-----------+----------+----------+---------------+----------+
## |       Item|      Code|     Price|Last_Year_Price|  Quantity|
## +-----------+----------+----------+---------------+----------+
## |Black Chair|        22|       100|             70|fill_value|
## |White Table|fill_value|       500|            350|        50|
## | Floor Lamp|        16|        60|             50|         1|
## |White Table|         3|fill_value|            499|        20|
## |      Couch|        12|fill_value|            900|fill_value|
## |White Table|         3|       500|            499|        20|
## |       Sofa|fill_value|fill_value|     fill_value|fill_value|
## +-----------+----------+----------+---------------+----------+
Item_Price_Nulls.fillna(999, subset=['Code']).show()
#Doesn't work as column type of 'Code' is string, not int. 
#Subset ignores when column type doesn't match. 
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|    null|
## |White Table|null|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3| null|            499|      20|
## |      Couch|  12| null|            900|    null|
## |White Table|   3|  500|            499|      20|
## |       Sofa|null| null|           null|    null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.fillna('999', subset=['Code']).show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|    null|
## |White Table| 999|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3| null|            499|      20|
## |      Couch|  12| null|            900|    null|
## |White Table|   3|  500|            499|      20|
## |       Sofa| 999| null|           null|    null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.fillna({'Code': 999,
                         'Price':000,
                         'Quantity':0}).show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|       0|
## |White Table| 999|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|    0|            499|      20|
## |      Couch|  12|    0|            900|       0|
## |White Table|   3|  500|            499|      20|
## |       Sofa| 999|    0|           null|       0|
## +-----------+----+-----+---------------+--------+

.fillna(value) is an alias of .na.fill(value). Either can be used depending on personal preference.

Exercise

  1. Instead of just removing the Head Office record, lets fill it. Create a new DataFrame with the nulls filled, with division set to Admin, budget set to 6262, and budget_per_head set to 1252.4.

Answer


fill_dictionary = {'division' : 'Admin', 'budget' : 6262, 'budget_per_head' : 1252.4}

department_data.fillna(fill_dictionary)
## DataFrame[department: string, employees: bigint, division: string, budget: int, budget_per_head: double]

Guidance

  • If you are making a dictionary to replace the nulls in like in the above answer, make sure your column names match up! If your ‘budget_per_head’ column is called something else, use your value!
  • This blanket fill only works for our data, because there is only a single null row. If another department had missing data, this would not work as planned; it’d fill both records with the new values!

5.7.3 Dropping Nulls

To drop nulls, the above method of column.isNotNull() can be used, to return only records with non-null values in a specific column. .dropna() can also be used, and offers more functionality.

.dropna(how='any', thresh=None, subset=None) can be used to completely drop records containing nulls. The parameter how can either drop rows with any nulls, or which are all null. If thresh is set to an integer, only records with less non-null values will be dropped, overriding the how parameter. As with fillna(), a subset of columns can be dealt with, excluding columns where null values aren’t an issue.

Item_Price_Nulls.show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|    null|
## |White Table|null|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3| null|            499|      20|
## |      Couch|  12| null|            900|    null|
## |White Table|   3|  500|            499|      20|
## |       Sofa|null| null|           null|    null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.dropna(how='any').show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.dropna(how='all').show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|    null|
## |White Table|null|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3| null|            499|      20|
## |      Couch|  12| null|            900|    null|
## |White Table|   3|  500|            499|      20|
## |       Sofa|null| null|           null|    null|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.dropna(how='all', subset = Item_Price_Nulls.columns[1:]).show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|    null|
## |White Table|null|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3| null|            499|      20|
## |      Couch|  12| null|            900|    null|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+
Item_Price_Nulls.dropna(thresh = 3).show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|    null|
## |White Table|null|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3| null|            499|      20|
## |      Couch|  12| null|            900|    null|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

As with .fillna(), the alias .na.drop(how) can be used instead of .dropna().

Exercise

  1. Repeat the exercise from 5.7.1 using .dropna() to remove the Head Office record from the joined employment-budget DataFrame.

Answers

department_data.dropna(how = 'any').show() #drop any record with any nulls
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  4781|97.57142857142857|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+
department_data.dropna('any').show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  4781|97.57142857142857|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+
department_data.dropna().show()
## +---------------+---------+----------+------+-----------------+
## |     department|employees|  division|budget|  budget_per_head|
## +---------------+---------+----------+------+-----------------+
## |          Legal|       11|     Admin|  9656|877.8181818181819|
## |Applied Science|       49|Scientific|  4781|97.57142857142857|
## |    Methodology|       27| Technical|  4644|            172.0|
## +---------------+---------+----------+------+-----------------+

Guidance

  • All of the above do the exact same thing, as the default value for the parameters in dropna() are how = 'any' - drop any row with any nulls. As such, you can just do df.dropna(), to drop the required record!

5.8 Drop Duplicates

When data has completely identical records, .dropDuplicates() can be used to leave only one copy of each record.

From this:

Item_Price_DataFrame.show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

To this:

Item_Price_DataFrame.dropDuplicates().show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |White Table|   3|  500|            350|      50|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## | Floor Lamp|  16|   60|             50|       1|
## |Black Chair|  22|  100|             70|      10|
## +-----------+----+-----+---------------+--------+

.dropDuplicates, or the similar .distinct() can be used to find the count of distinct records in one column.

print(Item_Price_DataFrame.select('Item').distinct().count())
## 4

Exercises

  1. What is the count of records in the DataFrame you created in 4.5 Unioning DataFrames - unioned_employees? What should the count be after dropping duplicates?

  2. Run .count() on the DataFrame before and after dropping duplicates, and compare the results to your expectations.

Answer

There should be 8 records in the unioned dataframe. After dropping duplicates, there should only be 4.

pre_count = unioned_employees.count()
dropped_employees = unioned_employees.dropDuplicates()
post_count = dropped_employees.count()
dropped_employees.show()
## +---------------+---------+
## |     department|employees|
## +---------------+---------+
## |    Head Office|        5|
## |Applied Science|       49|
## |          Legal|       11|
## |    Methodology|       27|
## +---------------+---------+
print('before:', pre_count, ' after:', post_count)
## before: 8  after: 4

5.9 Aggregate Data

Aggregating records together allows us to utilise data at a higher level. groupby() can take any number of columns, and will group the data for each unique subset of those columns.

Count how many different items have the same price

Item_Price_DataFrame.groupBy("Price").count().show()
## +-----+-----+
## |Price|count|
## +-----+-----+
## |  100|    1|
## | 1000|    1|
## |   60|    1|
## |  500|    3|
## +-----+-----+

.agg() takes a dictionary of column names and methods of aggregating - sum, max, avg, std etc.

Item_Price_DataFrame.groupby('Item').agg({'Quantity':'sum'}).show()
## +-----------+-------------+
## |       Item|sum(Quantity)|
## +-----------+-------------+
## |Black Chair|         10.0|
## |      Couch|          5.0|
## | Floor Lamp|          1.0|
## |White Table|         90.0|
## +-----------+-------------+

Exercise

  1. Create a DataFrame with the total budget for each division using .groupby() and .agg(). Use the original budget dataframe, rather than the one we have been applying transformations to.

Answer

divisional_budgets = department_budget.groupby('division').agg({'budget':'sum'})
divisional_budgets.show()
## +-----------+------------+
## |   division| sum(budget)|
## +-----------+------------+
## |Engineering|       45665|
## |      Admin|       71289|
## | Scientific|       40068|
## |  Technical|       43904|
## +-----------+------------+

5.10 Summary Statistics

Summary overviews of columns can be found with .describe(). It can be run with no variables to display details on all columns, or be passed a single column name, or a list of columns to describe only a few columns.

This returns the count of non-null values in each column, so it can be used to find columns which contain nulls by comparing the counts to the overall count of the DataFrame.

Item_Price_DataFrame.describe().show()
## +-------+-----------+-----------------+------------------+-----------------+------------------+
## |summary|       Item|             Code|             Price|  Last_Year_Price|          Quantity|
## +-------+-----------+-----------------+------------------+-----------------+------------------+
## |  count|          6|                6|                 6|                6|                 6|
## |   mean|       null|9.833333333333334| 443.3333333333333|394.6666666666667|17.666666666666668|
## | stddev|       null|8.134289561274953|341.85767018843774| 317.279477222632|17.625738755203045|
## |    min|Black Chair|               12|               100|              350|                 1|
## |    max|White Table|                3|                60|              900|                50|
## +-------+-----------+-----------------+------------------+-----------------+------------------+
Item_Price_DataFrame.describe('Code').show()
## +-------+-----------------+
## |summary|             Code|
## +-------+-----------------+
## |  count|                6|
## |   mean|9.833333333333334|
## | stddev|8.134289561274953|
## |    min|               12|
## |    max|                3|
## +-------+-----------------+

Note:

As min/max can be performed on a set of strings and returns the first and last alphabetically, for the numerical columns Code and Price the results for min/max are the first and last alphabetically, but not numerically. Mean and standard deviation cannot be performed on strings, so the columns are implictly cast to numbers and the true results found.

Exercise

  1. Find the mean and standard deviation of the budgets at a departmental level.

Answer

department_budget.describe('budget').show()
## +-------+------------------+
## |summary|            budget|
## +-------+------------------+
## |  count|                33|
## |   mean| 6088.666666666667|
## | stddev|2606.9864267323424|
## |    min|               114|
## |    max|              9724|
## +-------+------------------+


5.11 Casting Column Types

When reading data in from an HDFS table, the schema of the table should correctly set column types of the newly created DataFrame, to those of the original table. When reading in from a CSV, this is not true, and every column will be read in as a string, unless the optional parameter inferSchema = True is set.

Incorrect column types can cause strange issues to appear when handling the column as a whole, or when finding summaries of columns. Instead, .cast() can be used to convert columns from one data type, to another. The required type is imported first from pyspark.sql.types, then a column is overwritten with a version converted to a Double.

print(Item_Price_DataFrame.dtypes)
## [('Item', 'string'), ('Code', 'string'), ('Price', 'string'), ('Last_Year_Price', 'string'), ('Quantity', 'string')]
from pyspark.sql.types import DoubleType

Item_Price_Cast = Item_Price_DataFrame.withColumn('Code', Item_Price_DataFrame['Code'].cast(DoubleType()))
print(Item_Price_Cast.dtypes)
## [('Item', 'string'), ('Code', 'double'), ('Price', 'string'), ('Last_Year_Price', 'string'), ('Quantity', 'string')]
Item_Price_Cast.describe('Code').show()
## +-------+-----------------+
## |summary|             Code|
## +-------+-----------------+
## |  count|                6|
## |   mean|9.833333333333334|
## | stddev|8.134289561274953|
## |    min|              3.0|
## |    max|             22.0|
## +-------+-----------------+

The data type is now set to a double rather than a string, the output values are the same (with a .0 as its been made a double), and describe correctly shows the right min and max for the Code column.

Exercise

  1. You receive word that budgets will soon not be rounded to the nearest pound, so the integer column in your original department-budget DataFrame will need to be converted to a double. Create a new DataFrame with the budget column cast as a double, and then show the resulting DataFrame, and display the data types.

Note:

If your data is already a double, convert it into an integer!

Answer

from pyspark.sql.types import DoubleType

print(department_budget.dtypes)
## [('department', 'string'), ('division', 'string'), ('budget', 'int')]
department_budget_cast = department_budget.withColumn('budget', department_budget['budget'].cast(DoubleType()))

department_budget_cast.show(5)
## +-------------+---------+------+
## |   department| division|budget|
## +-------------+---------+------+
## |           HR|    Admin|7852.0|
## |      Finance|    Admin|8541.0|
## |        Legal|    Admin|9656.0|
## |International|    Admin|1913.0|
## |  IT Services|Technical|7420.0|
## +-------------+---------+------+
## only showing top 5 rows
print(department_budget_cast.dtypes)
## [('department', 'string'), ('division', 'string'), ('budget', 'double')]

Guidance

  • As an alternative to importing data types, you can pass into .cast() the name of the datatype as a string:
department_budget.withColumn('budget', department_budget['budget'].cast('double')).show(5)
## +-------------+---------+------+
## |   department| division|budget|
## +-------------+---------+------+
## |           HR|    Admin|7852.0|
## |      Finance|    Admin|8541.0|
## |        Legal|    Admin|9656.0|
## |International|    Admin|1913.0|
## |  IT Services|Technical|7420.0|
## +-------------+---------+------+
## only showing top 5 rows


5.12 Intended Learning Outcomes

Now, you should be able to perform Data querying and manipulation. You should be able to:

  1. Remove, Add, Rename a column

  2. Filter on a condition of a column and on multiple columns

  3. Replace values dependent on conditions

  4. Check for nulls

  5. Drop Duplicates

  6. Aggregate Data

  7. Do Summary Statistics


6 Save DataFrames to Files


Intended Learning Outcomes:

By the end of this Chapter, you should be able to:

  1. Use .repartition() and .coalesce() to obtain the optimal partitioning for your DataFrame and the cores you have available. You need to understand what the optimal partitioning is.

  2. Save data in a json or csv format.


6.1 Partitioning

If you are using a relatively small set of data, partitioning can generally be left to Spark to handle. If however, you have a very, very large set of data, proper partitioning is the key difference between your analysis succeeding promptly, and your processes crashing as soon as it gets a glimpse of the data.

For large sets of data, the importance of partitioning cannot be understated. Within DataBricks Community Edition, you have access to 8 cores. Within the ONS Data Service, you can specifiy the number of cores you require. When thinking about partitioning, the primary aim should be to have our data equally distributed between the cores.

If we have 8 cores, and 1 million records, it stands to reason that we want 125,000 records on each core. Unbalanced partitioning can lead to massive slow down. We want the number of partitions to be a multiple of the number of cores, to ensure that every core is being used equally.

In the first diagram below, if we assume that each partition takes 10 seconds for a task to happen, the total time will be 20 seconds - 10 seconds for the first 8 partitions, and 10 seconds for the last 2. During the last 2, 6 cores are sitting idle.

In the second diagram, with half as many partitions, each twice as large as before. the processing will take 20 seconds, but 3 cores will sit idle for the entire time.

In the final diagram, with the data equally split into 8 partitions, each partition will take 12.5 seconds to process. As each core has only 1 partition, the overall processing time will be 12.5 seconds, with no idle cores.

In general, 200MB per partition is recommended. When considering whether to reduce partitions or increase partitions to reach a factor of the number of cores, consider the size of per partition and use that to decide.

If you are unaware of the size of your DataFrame, you can look at the origial data stored within HUE, which should be able to tell you the rough filesize. Or, you can cache the DataFrame, and use the Spark UI to find the size of the cached data.

To cache the data, do the following:

Item_Price_DataFrame.cache().count()
## 6

The count() action is needed to make the cache() transformation occur.

Then, navigate to the Spark UI, and go to the storage tab. You should be able to see the DataFrame you cached, as well as the number of partitions it is stored in, and the amount of memory in takes up. You can use this to decide on a more useful number of partitions.

The Spark UI can be found within databricks by clicking on the currently attached cluster button in a notebook, and selecting Spark UI. Within DAP, open a CDSW workbench session, create a SparkSession object, then click the icon in the top right of the workbench, with a three-by-three grid of rectangles on it, and select Spark UI from the dropdown menu.

6.2 .repartition()

repartition(n) will return a new DataFrame with n partitions. It can both increase, and decrease, the number of partitions.

dataframe.repartition(8) for example will convert dataframes unknown number of partitions, with a relatively uniform distribution of data across them.

.repartition(n, cols) will repartition a DataFrame, into n partitions, but also keep records with the same values in cols the on the same partition. For example, if you have a Year column, all records with the same Year value will be placed on the same partition. With a bit of thought about how to repartition, by thinking about future transformations being performed on the data, efficiency can be had further down any processing pipeline by partitioning well at an early stage.

6.3 .coalesce()

coalesce(n) will return a DataFrame with n, or the current amount of, partitions, whichever is lower. As such, it can only be used to reduce the number of partitions.

It cannot guarantee an even distribution of records across all partitions, as it does not shuffle data, only split it. However using repartition to reduce the number of partitions will result in a more even distribution of data, at the cost of an expensive shuffle operation - shuffling data between partitions.

6.4 .rdd Methods

There are some especially useful RDD functions to find the number of partitions: df.rdd.getNumPartitions() and get a breakdown of the number of records of each parition using: df.rdd.glom().map(len).collect()

print(Item_Price_DataFrame.rdd.getNumPartitions())
## 1

df.rdd.glom().map(len).collect() is less obvious. .glom() is used to treat entire partitions as arrays. Any function applied to a glommed RDD will apply partition-by-partition, so when we map the len (length) of each partition, and collect the result, we get the length of each partition returned as a list.

Item_Price_DataFrame_3Part = Item_Price_DataFrame.repartition(3)
print(Item_Price_DataFrame_3Part.rdd.glom().map(len).collect())
## [2, 2, 2]

6.5 Save DataFrames to Files

6.5.1 Save as a CSV

Calling .write.csv(filepath, sep, mode) will write a DataFrame to the specified location filepath using sep as to separate columns. Setting mode to 'overwrite' allows pyspark to write multiple times, discarding the existing version each time.

By default this will write to as many CSV files as you have partitions. This can be avoided by coalescing the data into one partition first, then writing to CSV. Be warned - your data will all be brought to the same partition, so should be smaller than the size of the partition.

The filepath listed will be a directory, inside of which will be the actual data in as many CSV files as you have partitions, each file starting with the word ‘part’, and 3 meta-data files, stating whether the write process has been successful, when it began, and when it was last modified.

However, Spark is intelligent enough that read.csv() can take the directory path only, read in every part file within it, and return the entire collection of data as a partioned DataFrame. This is useful if you want to store a DataFrame temporarily for reuse within Pyspark, but having data split into multiple files is less useful for external use.

Item_Price_DataFrame.write.csv('/folder/Item_Price_csv', sep = ',', mode = 'overwrite') #writes to the same number of csv files as partitions

Item_Price_DataFrame.coalesce(1).write.csv('Item_Price_Coalesced', sep = ',', mode = 'overwrite') #coalesces to one partition, writes to one csv.

Exercise

  1. Save your department_employees DataFrame as a CSV file, within the directory intro_pyspark. Ensure it is on one partition first.

Answer

Databricks has no convenient way of viewing data that is not stored in a table. Instead, all files can be viewed by pressing the Data icon on the left bar, going to Add Data as you did to upload files previously, but then click on the DBFS tab rather than remaining on the default Upload File tab. From here, all the files can be found. Uploaded CSV files will be in FileStore/tables/filename.csv, while files you have saved will be at the top level - or follow any directory structure you give it. The below answers for example, will store the results in a folder called intro_pyspark

department_employees.coalesce(1).write.csv('intro_pyspark/department_employees_csv', mode = 'overwrite')

Compare the results of this to if you do:

department_employees.write.csv('intro_pyspark/department_employees_8_csv', mode = 'overwrite')

Guidance

  • When you click into one of the directories of data you have saved above, you should see three metadata files (specified by an initial underscore - _). These mark the start of the write process (_started_), the last change to the write process (‘committed’), and the overall status, (hopefully ’_SUCCESS’).
  • As well as these metadata files, there will be part files, with incrementing ids. These represent single partitions of the original DataFrame.
  • When you coalesce to one partition, you should see one part file.
  • When you don’t coalesce, you have written out the original number of partitions - e.g. 8 for the specific DataFrame.
    • However, if your DataFrame has empty partitions, at most only one of those will be written out; there is no benefit to writing out 4, or 40 completely empty partitions.
    • As such, while the original department_employees DataFrame has 8 partitions for its 4 records, there should only be 5 part files - one per record, and one empty partition.

6.5.2 Save as a json file

Calling .write.save(file, format) on a DataFrame will save to the specified location in the specified datatype. The example below will write only a selection of the columns from the DataFrame to the file location. As with CSVs, coalescing to 1 partition will ensure you have one output json, rather than a number equal to the number of partitions

Item_Price_DataFrame.coalesce(1).write.save("Item_Price_json",format="json")

6.6 Write to a Table

There are many different ways of writing data out to a table, with a huge amount of parameters which can be played with. Different teams will have different preferred ways of writing data out, some of which are shown below.

6.6.1 Writing via SQL strings

To write using raw SQL commands, the destination table must first exist, and then can be written into.

sql_create_table = 'CREATE TABLE IF NOT EXISTS item_price_data(Item varchar(255), Code varchar(255), Price varchar(255), LastYearPrice varchar(255), Quantity varchar(255))'

spark.sql(sql_create_table)

Once an empty table has been created, it can be populated by creating a temporary view of your data using .createOrReplaceTempView('tablename'), and then writing that to the table.

Item_Price_DataFrame.createOrReplaceTempView('tempTableView')
sql_write_data = 'INSERT OVERWRITE TABLE item_price_data SELECT * FROM tempTableView'

spark.sql(sql_write_data)

INSERT OVERWRITE from the SQL above will replace any existing data with the new data.

6.6.2 Writing via Python

A second way of writing to tables is using a similar format as to write as a CSV, using df.write.format().mode().saveAsTable().

Within the format(), pass in a string referring to how the underlying data should be formatted (e.g. parquet, avro, json), within mode() pass in a string referring to how the new data should be inserted (e.g. append, overwrite), and within the saveAsTable(), pass in the name of the desired table.

Item_Price_DataFrame.write.format('parquet').mode('overwrite').saveAsTable('item_price_data') #Overwrite existing data. Schema does not need to match, new data schema will overwrite existing. 

Note:

Parquet: Optimised for efficient queries, data parsing.

Avro: Good for large binary data, or where data users want access to entire single records at once.

JSON: Good for when data is distributed across a lot of small files, aggregate data.

More Info

Exercise

  1. Write your department-employees DataFrame to a new HDFS table using one of the methods above. Using the method from before to import from an SQL table to read the data back in and check it is the same.

Answer

Method 1 - SQL:

create_table = 'CREATE TABLE IF NOT EXISTS department_employees_sql(department VARCHAR(255), employees INTEGER)'
store_data = 'INSERT OVERWRITE TABLE department_employees_sql SELECT * FROM temp_table_view'

spark.sql(create_table)
department_employees.createOrReplaceTempView('temp_table_view')
spark.sql(store_data)

Guidance

  • In the above answer, I create the two SQL strings and store them as variables, before I run them all at once, including the creation of the temporary table. I haven’t changed the number of partitions in either method, because I want the resulting table to have the same partition structure.

Method 2 - Python:

# overwrite existing data, store in table 'department_employees'
department_employees.write.format('parquet').mode('overwrite').saveAsTable('department_employees') 

6.6.3 Writing to Tables and Repartitioning

A written HDFS table should preserve the number of partitions in the data being written to it - repartitioning before writing should set the number of partitions and distribution of data, and preserve that in the HDFS table. Reading the data back in should see the same distribution, but empty partitions might be lost as seen in the diagram below, which shows some of the vagaries of repartitioning.



Appending data to an existing partitioned HDFS table might however not work exactly as expected, partitions might not hold the exact records expected when the new data is combined with the existing table data. To get around this, the original data could be read in, unioned in pyspark with the new data, repartitioned to force partitioning, and the original overwritten.


6.7 Intended Learning Outcomes

Now, you should be able to:

  1. Use .repartition() and .coalesce() to obtain the optimal partitioning for your DataFrame and the cores you have available. You need to understand what the optimal partitioning is.

  2. Save data in a json or csv format.


7 Data Visualization


Intended Learning Outcomes:

By the end of this Chapter, you should be able to:

  1. Take a sample from the Data

  2. Convert Spark DataFrames into pandas DataFrame

  3. Make a simple scatterplot in Python.


Remember, we are using Spark because the data is really big. To visualize the data we need to take a sample of the data and turn it to pandas Dataframe. The motivation behind that is that when you turn it to a pandas Dataframe all the data will return to one core and it is too big to allow for that. Thus, we need to take a sample of the Data, turn it to a pandas and then work normally like we would in the traditional Python.

7.1 Take a sample

The Item_Price_DataFrame is shown below:

Item_Price_DataFrame.show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## |White Table|   3|  500|            350|      50|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## |      Couch|  12| 1000|            900|       5|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

.sample(withReplacement, fraction, seed=None): returns a sampled subset of this DataFrame, but it does not guarantee to provide exactly the fraction specified of the total count of the given DataFrame.

withReplacement : if it is True then a value can be used more than once, it is False then one value can only be used once.

fraction: sampling fraction of the data

seed : random seed

Item_Price_DataFrame_Sample = Item_Price_DataFrame.sample(withReplacement = False, fraction = 0.5, seed = 1)
Item_Price_DataFrame_Sample.show()
## +-----------+----+-----+---------------+--------+
## |       Item|Code|Price|Last_Year_Price|Quantity|
## +-----------+----+-----+---------------+--------+
## |Black Chair|  22|  100|             70|      10|
## | Floor Lamp|  16|   60|             50|       1|
## |White Table|   3|  500|            499|      20|
## +-----------+----+-----+---------------+--------+

7.2 Convert to pandas DataFrame

.toPandas() : returns the contents of the DataFrame as Pandas DataFrame. It is only available if Pandas is installed and available.

Item_Price_Pandas_DataFrame = Item_Price_DataFrame_Sample.toPandas()

print(Item_Price_Pandas_DataFrame)
##           Item Code Price Last_Year_Price Quantity
## 0  Black Chair   22   100              70       10
## 1   Floor Lamp   16    60              50        1
## 2  White Table    3   500             499       20

Exercise

  1. Take a sample from the original budget DataFrame and then convert it into a pandas DataFrame. Show the outcome - use print(df_pd) or df_pd.head().

Answer

sample_budget = department_budget.sample(False, 0.5, 1)
budget_pandas = sample_budget.toPandas()
print(budget_pandas)
##               department     division  budget
## 0                     HR        Admin    7852
## 1                  Legal        Admin    9656
## 2          International        Admin    1913
## 3                    R&D   Scientific    8389
## 4    Systems Engineering  Engineering    6564
## 5                Transit  Engineering    7069
## 6               Autonomy  Engineering    4060
## 7                    Air  Engineering    8712
## 8               Maritime  Engineering    8480
## 9       Customer Support        Admin    8293
## 10             Executive        Admin    6967
## 11          Data Science   Scientific    7603
## 12  Radio Communications   Scientific    3370
## 13   Economic Statistics    Technical    9629
## 14             Marketing        Admin    4989
## 15                Survey    Technical    4665
## 16         Deep Learning   Scientific    8420
## 17              Graphics    Technical    9724

7.3 Plot

Different methods are needed in DAP and Databricks.

DAP

import matplotlib.pyplot as plt
plt.scatter(Item_Price_Pandas_DataFrame['Price'], Item_Price_Pandas_DataFrame['Last_Year_Price'])
plt.xlabel('Price')
plt.ylabel('Last Year Price')
plt.title('Price Vs Last Year Price')

plt.show()

Databricks

import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot(Item_Price_Pandas_DataFrame['Price'], Item_Price_Pandas_DataFrame['Last_Year_Price'], 'ko')

display(fig)


7.4 Intended Learning Outcomes

Now, you should be able to:

  1. Take a sample from the Data

  2. Convert Spark DataFrames into pandas DataFrame

  3. Make a simple scatterplot in Python.


8 Spark in the ONS Data Service


Intended Learning Outcomes:

By the end of this Chapter, you should understand:

  1. How to create a SparkSession in the ONS Data Service to be able to run pyspark commands.


The the ONS Data Service contains multiple interconnected systems for the storage and manipulation of data, of which Cloudera Data Science Workshop (CDSW) forms a core. CDSW acts as an Integrated Development Environment (IDE) for Python, R and Scala development, and connects to a required amount of executors for distributed processing.

Data can be found within HUE (Hadoop User Experience) which acts as an interface for the data stored within the ONS Data Service, from CSV files to database tables.

When a session is started within the workbench, an engine profile is specified, determining the number of vCPUs and memory of the driver. As the amount of resources are finite, it is recommended to use the smallest profile that will fulfill your requirements. From within the users code, a SparkSession can be created, with the number of executors, and the size of executors, specified. This can then be used to read, process, and export, data from within HUE.

To fully utilise the capabilities of distributed programming, it is generally preferable to use a large number of smaller executors, rather than a smaller number of large ones.

An example SparkSession can be found below, with a dynamic number of executors, each containing 4 cores with 4gb of memory.

from pyspark.sql import SparkSession

spark = (
  SparkSession.builder.appName('project_name')
  .config('spark.executor.memory', '4g')
  .config('spark.executor.cores', 4)
  .config('spark.shuffle.service.enabled', 'true')
  .config('spark.dynamicAllocation.enabled', 'true')
  .config('spark.dynamicAllocation.maxExecutors', 8)
  .enableHiveSupport()
  .getOrCreate()
)

.appName() allows for a name to be defined for the SparkSession to differentiate it from others.

spark.executor.memory is a config setting which defines the amount of memory per executor process.

spark.executor.cores defines the number of cores available for the executor process.

.enableHiveSupport() allows connectivity to Hive’s store of metadata - Hive is a database within HUE.

.getOrCreate() will either create the specified SparkSession, or get an existing SparkSession.

Full details of config settings can be found here: http://spark.apache.org/docs/latest/configuration.html.

As mentioned above in the data ingest section, pathing for files is important within the ONS Data Service From sql tables, a simple databse_name.table_name will do, while for CSV or JSON files, the full path below is recommended.

data_from_HUE_table = spark.sql('SELECT * FROM database_name.table_name')
data_from_CSV_file = spark.read.csv('hdfs://prod1/dapsen/landing_zone/ons/mbs/mbs_120219.csv', header=True)


8.1 Intended Learning Outcomes

Now, you should understand:

  1. How to create a SparkSession in the ONS Data Service


9 Case-Study (Think - Discuss - Do)


Intended Learning Outcomes:

By the end of this Chapter you should:

  1. Feel confident to deal with Data using pyspark.

  2. Be able to combine what we learnt in the previous Chapters to investigate and manipulate Data.

  3. Be able to investigate the current partitioning of your DataFrame, calculate the optimal partition based on your DataFrame and your resources and make the necessary changes to achieve that.


9.1 Case Study 1

Briefing

You have been given two subsets of a huge dataset that contain the Travel to Work Areas for the United Kingdom. You have been assigned with the task to provide some basic descriptive analysis for the Travel to Work Areas, incuding the number of Lower Super Output Areas within the Travel to Work Area.

The main aim is to find the number of Small Areas within each Travel to Work Area.

You may need to consider the following:

  1. What the two subsets are and how to join them.

  2. What kind of join you want to do and why.

  3. Check for any duplicates and/or missing values. Can any missing values be imputed?

Note:

Any errors, duplicated data, or missing values have been added as part of this course and do not reflect the state of the original data.

Travel_to_Work_Areas: Defined by an area in which the working population is over 3500, and over 75% of those who work in the area live in the area, and over 75% of those who live in the area work in the area.

Small_Area_Code: Lower Layer Super Output Area (LSOA) for England and Wales, Data Zone for Scotland, Super Output Area for Northern Ireland

Small_Area_Name: Lower Layer Super Output Area (LSOA) for England and Wales, Data Zone for Scotland, Super Output Area for Northern Ireland

SOAs are a geography hierarchy designed to improve the reporting of small-area statistics. In England and Wales Lower Layer SOAs (LSOA) with a minimum population of 1,000 and Middle Layer SOAs (MSOA) with a minimum population of 5,000 were introduced in 2004. LSOAs are of consistent size across the country and won’t be subject to regular boundary change. A decision was made not to create an Upper Layer in England, while in Wales an Upper Layer (USOA) was created. In Northern Ireland there is a single layer of SOAs, with a minimum population of 1,300. The Scottish equivalents of SOAs are Data Zones (DZ) with a minimum population of 500 and Intermediate Zones (IZ) with a minimum population of 2,500.

Area_Number: 1 to 228

Area_Code: Travel-To-Work-Area Code

Area_Name: Travel-To-Work-Area Name

Note:

  1. The original dataset can be found here

  2. Glossary for the dataset can be found here

Example Answer

Read in and look at the data

By reading in and showing the data, we can get a feel for the data that is held in the files.

ttw_areas = spark.sql('SELECT * FROM travel_to_work_areas)
ttw_areas.show(10)
## +---------------+-----------+---------+---------+
## |Small_Area_Code|Area_Number|Area_Code|Area_Name|
## +---------------+-----------+---------+---------+
## |       95AA01S1|         18|N12000002|  Belfast|
## |       95AA01S2|         18|N12000002|  Belfast|
## |       95AA01S3|         18|N12000002|     null|
## |       95AA02W1|         18|N12000002|  Belfast|
## |       95AA03W1|         18|N12000002|     null|
## |       95AA04W1|         18|N12000002|  Belfast|
## |       95AA05W1|          9|N12000001|Ballymena|
## |       95AA06S1|         18|N12000002|  Belfast|
## |       95AA06S2|         18|N12000002|  Belfast|
## |       95AA07W1|          9|N12000001|Ballymena|
## +---------------+-----------+---------+---------+
## only showing top 10 rows
print(ttw_areas.dtypes)
## [('Small_Area_Code', 'string'), ('Area_Number', 'int'), ('Area_Code', 'string'), ('Area_Name', 'string')]
ttw_small_areas = spark.sql('SELECT * FROM travel_to_work_small_areas)
ttw_small_areas.show(10)
## +---------------+----------------+
## |Small_Area_Code| Small_Area_Name|
## +---------------+----------------+
## |       95AA01S1|    Aldergrove 1|
## |       95AA01S2|    Aldergrove 2|
## |       95AA01S3|    Aldergrove 3|
## |       95AA02W1|          Balloo|
## |       95AA03W1|     Ballycraigy|
## |       95AA04W1|           Clady|
## |       95AA05W1|       Cranfield|
## |       95AA06S1|Crumlin 1 Antrim|
## |       95AA06S2|Crumlin 2 Antrim|
## |       95AA07W1|      Drumanaway|
## +---------------+----------------+
## only showing top 10 rows
print(ttw_small_areas.dtypes)
## [('Small_Area_Code', 'string'), ('Small_Area_Name', 'string')]


Check for Duplicates

If there are duplicates in the data, we want to catch them early. Let’s count the data, drop the duplicates, count again. If the count is different, then duplicates existed in the original data, but have now been sorted out.

print(ttw_areas.count(), ttw_small_areas.count())
## 453 453
ttw_areas_temp = ttw_areas.dropDuplicates()
ttw_small_areas_temp = ttw_small_areas.dropDuplicates()

print(ttw_areas_temp.count(), ttw_small_areas_temp.count())
## 452 452

As there were duplicates, we can overwrite our original data with the temp DataFrames with the duplicates removed.

ttw_areas = ttw_areas_temp
ttw_small_areas = ttw_small_areas_temp


Join the Data

There is one column common to both DataFrames, so we will join on that - Small_Area_Code. From inspection it looks like matching records are found in both DataFrames, so the type of join shouldn’t matter. However, we will do a left join keeping all data in travel_to_work_areas, as that data has some meaning without the small_area data, while the opposite is false.

ttw_joined = ttw_areas.join(ttw_small_areas, on = 'Small_Area_Code', how = 'left')

print(ttw_joined.count())
## 452

This also has the correct number of records, a good sign!


Nulls

Lets look for nulls. .describe() will return the count of non-null values from that column. If that differs to the count found above, we know there are nulls in that column.

ttw_joined.describe().show()
## +-------+---------------+------------------+---------+-------------------+---------------+
## |summary|Small_Area_Code|       Area_Number|Area_Code|          Area_Name|Small_Area_Name|
## +-------+---------------+------------------+---------+-------------------+---------------+
## |  count|            452|               452|      451|                448|            452|
## |   mean|           null| 33.84070796460177|     null|               null|           null|
## | stddev|           null|29.214040414996887|     null|               null|           null|
## |    min|       95AA01S1|                 9|N12000001|          Ballymena|     Abbey Park|
## |    max|       95LL26S2|               152|N12000009|Newry and Banbridge|      Wynchurch|
## +-------+---------------+------------------+---------+-------------------+---------------+

We can see that there are nulls in two columns - Area_Code and Area_Name, with 1 and 4 nulls respectively. Lets have a look at these results:

ttw_joined.where((ttw_joined['Area_Code'].isNull() | ttw_joined['Area_Name'].isNull())).show()
## +---------------+-----------+---------+---------+----------------+
## |Small_Area_Code|Area_Number|Area_Code|Area_Name| Small_Area_Name|
## +---------------+-----------+---------+---------+----------------+
## |       95CC09S2|         63|N12000005|     null| Hamiltonsbawn 2|
## |       95AA01S3|         18|N12000002|     null|    Aldergrove 3|
## |       95AA10W1|         18|     null|  Belfast|Greystone Antrim|
## |       95CC07W1|         63|N12000005|     null|      Derrynoose|
## |       95AA03W1|         18|N12000002|     null|     Ballycraigy|
## +---------------+-----------+---------+---------+----------------+


Impute if possible

Can we use any data from other columns to impute our missing values?

When we showed the first 10 records above, it looks like there is a relationship between Area_Code and Area_Name - hopefully a one-to-one relationship. Lets take a look, and do a few aggregates to see.

ttw_joined.where(ttw_joined['Area_Name'] == 'Belfast').groupby('Area_Code').count().show()
## +---------+-----+
## |Area_Code|count|
## +---------+-----+
## |     null|    1|
## |N12000002|  268|
## +---------+-----+
ttw_joined.where(ttw_joined['Area_Code'] == 'N12000002').groupby('Area_Name').count().show()
## +---------+-----+
## |Area_Name|count|
## +---------+-----+
## |  Belfast|  268|
## |     null|    2|
## +---------+-----+
ttw_joined.where(ttw_joined['Area_Code'] == 'N12000005').groupby('Area_Name').count().show()
## +---------+-----+
## |Area_Name|count|
## +---------+-----+
## |Craigavon|   67|
## |     null|    2|
## +---------+-----+

This shows us that we should be able to impute the missing data. If in doubt, ask a subject matter expert who knows the data well. The filtered data above shows that every Belfast record has an Area_Code of N12000002, apart from the nulls, and the same for the two codes and their respective Area_Names.

Now we know what to fill our nulls with, we can do so. There are many different ways of doing this. A mapper could be created showing the one-to-one relationship between Area_Name and Area_Code, joined onto the original data, and the new columns combined with the old. In this case, because we have so few values to impute, .when().otherwise() can be used 3 times to fix the missing values.

from pyspark.sql.functions import when

ttw_joined = ttw_joined.withColumn('Area_Code', when(ttw_joined['Area_Name'] == 'Belfast', 'N12000002').otherwise(ttw_joined['Area_Code']))
ttw_joined = ttw_joined.withColumn('Area_Name', when(ttw_joined['Area_Code'] == 'N12000002', 'Belfast').otherwise(ttw_joined['Area_Name']))
ttw_joined = ttw_joined.withColumn('Area_Name', when(ttw_joined['Area_Code'] == 'N12000005', 'Craigavon').otherwise(ttw_joined['Area_Name']))

Let us check that this has fixed the null values.

ttw_joined.where(ttw_joined['Area_Name'] == 'Belfast').groupby('Area_Code').count().show()
## +---------+-----+
## |Area_Code|count|
## +---------+-----+
## |N12000002|  271|
## +---------+-----+
ttw_joined.where(ttw_joined['Area_Code'] == 'N12000002').groupby('Area_Name').count().show()
## +---------+-----+
## |Area_Name|count|
## +---------+-----+
## |  Belfast|  271|
## +---------+-----+
ttw_joined.where(ttw_joined['Area_Code'] == 'N12000005').groupby('Area_Name').count().show()
## +---------+-----+
## |Area_Name|count|
## +---------+-----+
## |Craigavon|   69|
## +---------+-----+

There are no nulls, and the counts are as expected. One last describe will check the overall column status.

ttw_joined.describe().show()
## +-------+---------------+------------------+---------+-------------------+---------------+
## |summary|Small_Area_Code|       Area_Number|Area_Code|          Area_Name|Small_Area_Name|
## +-------+---------------+------------------+---------+-------------------+---------------+
## |  count|            452|               452|      452|                452|            452|
## |   mean|           null| 33.84070796460177|     null|               null|           null|
## | stddev|           null|29.214040414996887|     null|               null|           null|
## |    min|       95AA01S1|                 9|N12000001|          Ballymena|     Abbey Park|
## |    max|       95LL26S2|               152|N12000009|Newry and Banbridge|      Wynchurch|
## +-------+---------------+------------------+---------+-------------------+---------------+


Answer the Question

The original question, beyond all the cleaning, is to display the counts of each small area within each travel to work area. This can be done by a simple groupby.

ttw_joined.groupby('Area_Name').count().show()
## +--------------------+-----+
## |           Area_Name|count|
## +--------------------+-----+
## |           Ballymena|   37|
## |           Craigavon|   69|
## |             Belfast|  271|
## |           Dungannon|    2|
## |           Coleraine|   43|
## | Newry and Banbridge|   14|
## |Cookstown and Mag...|   16|
## +--------------------+-----+

9.2 Case Study 2

Briefing

Given the dataset, investigate if using the current partitioning of DataFrames is efficient and we are not wasting resources. Justify and perform any action, as needed.

What happens if you coalesce to 8 partitions, or repartition to 2000? How long do simple actions take if the data has been transformed such.

Hint:

One of the main reasons to use RDDs, is when you need finer control of your data. As each dataframe is built on RDDs, you can access all RDD methods purely by calling df.rdd.method_name.

There are some especially useful RDD functions to find the number of partitions: df.rdd.getNumPartitions() and get a breakdown of the number of records of each parition using: df.rdd.glom().map(len).collect()

Example Answer

Introduction

Databricks cells state at the bottom how long they take to run. As such, a command in a new cell can be run - such as .count(), and the time easily observed. If we change the number of partitions to extreme values and run these commands, we can get a feel for how the number of partitions should be tailored to the data.

Let us do a few different things. We’ll try raising the number of partitions to a very high number, lowering it to a very small number, and keeping it to the default. For each variant, we’ll run a count, to see how long it takes. Assuming the original data is not very large, the obvious hypothesis is that for a very large number of partitions, the process will slow down, as the system will have to deal with the overhead of each of those partitions, while they wont hold much data.

Current Partitioning

Working from the joined travel to work data - ttw_joined, we know we have 452 records. Lets use an RDD method to find the number of partitions.

print(ttw_joined.rdd.getNumPartitions())
## 200

200 partitions is probably not a good number of partitions to split 452 records into - this is about 2 records per partition!

Lets run a simple action in a new cell - such as a count() and see how long it takes to run.

print(ttw_joined.count())
## 452

For me, this took 1.02 seconds.

Let us also find the lengths of each partition, and some simple statistics, the minimum and maximum number of records per partitions, the average number, and the standard deviation of these values.

import numpy as np
l = ttw_joined.rdd.glom().map(len).collect()
print(l)
## [2, 3, 2, 2, 0, 4, 4, 2, 3, 2, 0, 2, 3, 3, 3, 2, 2, 2, 3, 2, 1, 3, 0, 2, 4, 2, 2, 2, 4, 2, 0, 3, 1, 2, 2, 3, 3, 4, 1, 3, 0, 3, 2, 2, 3, 3, 5, 3, 1, 2, 1, 1, 3, 2, 4, 5, 4, 1, 3, 1, 2, 2, 1, 2, 3, 1, 3, 4, 0, 1, 2, 3, 2, 2, 5, 2, 0, 1, 3, 1, 4, 3, 2, 5, 3, 2, 1, 1, 1, 2, 1, 1, 0, 1, 3, 1, 2, 3, 2, 0, 4, 1, 3, 2, 1, 1, 2, 2, 1, 2, 1, 2, 4, 1, 2, 0, 2, 0, 3, 2, 1, 3, 1, 3, 2, 5, 3, 5, 2, 1, 3, 1, 0, 0, 3, 4, 2, 3, 2, 2, 7, 2, 2, 1, 2, 4, 2, 2, 1, 2, 3, 1, 0, 3, 1, 4, 0, 4, 2, 6, 6, 3, 5, 2, 4, 2, 4, 3, 3, 1, 0, 2, 3, 2, 2, 0, 4, 3, 5, 4, 5, 1, 1, 2, 2, 1, 3, 2, 4, 3, 3, 1, 2, 3, 1, 1, 3, 4, 2, 2]
print(min(l), max(l), sum(l)/len(l), np.std(l))
## 0 7 2.26 1.3462540622037136

Some partitions are empty, while others have 7 records. Clearly the ideal is ~2.5 records per partition - not numbers in the range 0 to 7!

We can use another inbuilt method - a Counter, to find out how many partitions there are with each of 0 to 7 records on them:

from collections import Counter
print(Counter(l))
## Counter({2: 64, 3: 45, 1: 41, 4: 21, 0: 17, 5: 9, 6: 2, 7: 1})


Reduce the Number of Partitions

Let us bring the number of partitions down to 8. To do this, we can use either coalesce(8), or repartition(8) and first take a look at the new partition structure.

ttw_8_part = ttw_joined.coalesce(8)
l = ttw_8_part.rdd.glom().map(len).collect()
print(min(l), max(l), sum(l)/len(l), np.std(l))
## 45 66 56.5 6.96419413859206
print(Counter(l))
## Counter({58: 2, 56: 1, 45: 1, 46: 1, 61: 1, 66: 1, 62: 1})

And again, in a new cell, run a count.

print(ttw_8_part.count())
## 452

For me, this took 0.17 seconds, less than a fifth of the original time! We also have no empty partitions, and while not perfect, the partitioning isn’t terrible. With a longer process with multiple transformations, or a much larger data set, this will really add up.


Increase the Number of Partitions

Now, lets raise the number up, this time to 2000 partitions. Immediately this seems like a bad idea - with only 452 records there will be at least 1548 empty partitions! As we want to increase the number of partitions, we need to use repartition(2000).

ttw_2000_part = ttw_joined.repartition(2000)
l = ttw_2000_part.rdd.glom().map(len).collect()
print(min(l), max(l), sum(l)/len(l), np.std(l))
## 0 2 0.226 0.42181038394046205
print(Counter(l))
## Counter({0: 1551, 1: 446, 2: 3})

Clearly this data is not well distributed. The vast majority of partitions are empty, while the overwhelming majority of non-empty partitions have but a single record within them.

Finally, in a new cell, we run a count.

print(ttw_2000_part.count())
## 452

For me this took 3.24 seconds. Three times longer than the original, and 16 times longer than the more efficient 8 partitions.


9.3 Intended Learning Outcomes

Now, you should:

  1. Feel confident to deal with Data using pyspark.

  2. Be able to combine what we learnt in the previous Chapters to investigate and manipulate Data.

  3. Be able to investigate the current partitioning your DataFrame, calculate the optimal partition based on your DataFrame and your resources and make the necessary changes to achieve that.


11 What next

  1. Spark’s Machine learning library https://spark.apache.org/docs/2.3.1/api/python/pyspark.ml.html/

  2. Graphs and graph-parallel computation https://spark.apache.org/graphx/

  3. Spark Streaming https://spark.apache.org/streaming/

  4. Using Scala in Spark

  5. Datasets

  6. Advanced Topics on Partitioning and Optimisation

  7. Resilient Distributed Systems (RDDs)